diff --git a/CHANGELOG.md b/CHANGELOG.md index 45a16f7a..4185e872 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,15 +4,19 @@ OhMyThreads.jl Changelog Version 0.5.0 ------------- +- ![Feature][badge-feature] The parallel functions (e.g. tmapreduce etc.) now support `scheduler::Symbol` besides `scheduler::Scheduler`. To configure the selected scheduler (e.g. set `nchunks` etc.) one may now pass keyword arguments directly into the parallel functions (they will get passed on to the scheduler constructor). Example: `tmapreduce(sin, +, 1:10; chunksize=2, scheduler=:static)`. Analogous support has been added to the macro API: (Most) settings (`@set name = value`) will now be passed on to the parallel functions as keyword arguments (which then forward them to the scheduler constructor). Note that, to avoid ambiguity, we don't support this feature for `scheduler::Scheduler` but only for `scheduler::Symbol`. - ![Feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading. - ![Feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`) - ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error). - ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise). +- ![Enhancement][badge-enhancement] `SerialScheduler` and `DynamicScheduler` now support the keyword argument `ntasks` as an alias for `nchunks`. - ![Enhancement][badge-enhancement] Made `@tasks` use `OhMyThreads.WithTaskLocals` automatically as an optimization. +- ![Enhancement][badge-enhancement] Uses of `@local` within `@tasks` no-longer require users to declare the type of the task local value, it can be inferred automatically if a type is not provided. - ![BREAKING][badge-breaking] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. (This is unlikely to break existing code but technically could because the type parameter has changed from `Bool` to `ChunkingMode`.) - ![Breaking][badge-breaking] `DynamicScheduler` and `StaticScheduler` don't support `nchunks=0` or `chunksize=0` any longer. Instead, chunking can now be turned off via an explicit new keyword argument `chunking=false`. - ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed). - ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped. +- ![BREAKING][badge-breaking] The default value for `ntasks`/`nchunks` for `DynamicScheduler` has been changed from `2*nthreads()` to `nthreads()`. With the new value we now align with `@threads :dynamic`. The old value wasn't giving good load balancing anyways and choosing a higher value penalizes uniform use cases even more. To get the old behavior, set `nchunks=2*nthreads()`. - ![Bugfix][badge-bugfix] When using the `GreedyScheduler` in combination with `tmapreduce` (or functions that build upon it) there could be non-deterministic errors in some cases (small input collection, not much work per element, see [#82](https://github.com/JuliaFolds2/OhMyThreads.jl/issues/82)). These cases should be fixed now. Version 0.4.6 diff --git a/README.md b/README.md index 612ef4ec..457e7e81 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,13 @@ focus on [data parallelism](https://en.wikipedia.org/wiki/Data_parallelism), tha ## Example ```julia -using OhMyThreads +using OhMyThreads: tmapreduce, @tasks +using BenchmarkTools: @btime +using Base.Threads: nthreads # Variant 1: function API -function mc_parallel(N; kw...) - M = tmapreduce(+, 1:N; kw...) do i +function mc_parallel(N; ntasks=nthreads()) + M = tmapreduce(+, 1:N; ntasks) do i rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -50,9 +52,12 @@ function mc_parallel(N; kw...) end # Variant 2: macro API -function mc_parallel_macro(N) +function mc_parallel_macro(N; ntasks=nthreads()) M = @tasks for i in 1:N - @set reducer=+ + @set begin + reducer=+ + ntasks=ntasks + end rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -62,19 +67,17 @@ end N = 100_000_000 mc_parallel(N) # gives, e.g., 3.14159924 -using BenchmarkTools - -@show Threads.nthreads() # 5 in this example - -@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread -@btime mc_parallel($N) # using all 5 threads +@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread) +@btime mc_parallel($N) # using all threads +@btime mc_parallel_macro($N) # using all threads ``` -Timings might be something like this: +With 5 threads, timings might be something like this: ``` -447.093 ms (7 allocations: 624 bytes) -89.401 ms (66 allocations: 5.72 KiB) +417.282 ms (14 allocations: 912 bytes) +83.578 ms (38 allocations: 3.08 KiB) +83.573 ms (38 allocations: 3.08 KiB) ``` (Check out the full [Parallel Monte Carlo](https://juliafolds2.github.io/OhMyThreads.jl/stable/literate/mc/mc/) example if you like.) diff --git a/docs/make.jl b/docs/make.jl index f2718244..4cb90c74 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -28,7 +28,7 @@ makedocs(; ] ], repo = "https://github.com/JuliaFolds2/OhMyThreads.jl/blob/{commit}{path}#{line}", - format = Documenter.HTML(repolink = "https://github.com/JuliaFolds2/OhMyThreads.jl")) + format = Documenter.HTML(repolink = "https://github.com/JuliaFolds2/OhMyThreads.jl"; collapselevel = 1)) if ci @info "Deploying documentation to GitHub" diff --git a/docs/src/index.md b/docs/src/index.md index 91ae22bb..2018efcc 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -14,11 +14,13 @@ to add the package to your Julia environment. ### Basic example ```julia -using OhMyThreads +using OhMyThreads: tmapreduce, @tasks +using BenchmarkTools: @btime +using Base.Threads: nthreads # Variant 1: function API -function mc_parallel(N; kw...) - M = tmapreduce(+, 1:N; kw...) do i +function mc_parallel(N; ntasks=nthreads()) + M = tmapreduce(+, 1:N; ntasks) do i rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -26,9 +28,12 @@ function mc_parallel(N; kw...) end # Variant 2: macro API -function mc_parallel_macro(N) +function mc_parallel_macro(N; ntasks=nthreads()) M = @tasks for i in 1:N - @set reducer=+ + @set begin + reducer=+ + ntasks=ntasks + end rand()^2 + rand()^2 < 1.0 end pi = 4 * M / N @@ -38,19 +43,17 @@ end N = 100_000_000 mc_parallel(N) # gives, e.g., 3.14159924 -using BenchmarkTools - -@show Threads.nthreads() # 5 in this example - -@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread -@btime mc_parallel($N) # using all 5 threads +@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread) +@btime mc_parallel($N) # using all threads +@btime mc_parallel_macro($N) # using all threads ``` -Timings might be something like this: +With 5 threads, timings might be something like this: ``` -447.093 ms (7 allocations: 624 bytes) -89.401 ms (66 allocations: 5.72 KiB) +417.282 ms (14 allocations: 912 bytes) +83.578 ms (38 allocations: 3.08 KiB) +83.573 ms (38 allocations: 3.08 KiB) ``` (Check out the full [Parallel Monte Carlo](@ref) example if you like.) diff --git a/docs/src/literate/integration/integration.jl b/docs/src/literate/integration/integration.jl index 0ec74dcb..8b5ef1d0 100644 --- a/docs/src/literate/integration/integration.jl +++ b/docs/src/literate/integration/integration.jl @@ -29,7 +29,6 @@ end # interval, as a multiple of the number of available Julia threads. using Base.Threads: nthreads -@show nthreads() N = nthreads() * 1_000_000 @@ -82,3 +81,5 @@ using BenchmarkTools # Because the problem is trivially parallel - all threads to the same thing and don't need # to communicate - we expect an ideal speedup of (close to) the number of available threads. + +nthreads() diff --git a/docs/src/literate/integration/integration.md b/docs/src/literate/integration/integration.md index 6d979b34..59bec287 100644 --- a/docs/src/literate/integration/integration.md +++ b/docs/src/literate/integration/integration.md @@ -46,13 +46,12 @@ interval, as a multiple of the number of available Julia threads. ````julia using Base.Threads: nthreads -@show nthreads() N = nthreads() * 1_000_000 ```` ```` -5000000 +10000000 ```` Calling `trapezoidal` we do indeed find the (approximate) value of $\pi$. @@ -101,6 +100,10 @@ end # end ```` +```` +trapezoidal_parallel (generic function with 1 method) +```` + First, we check the correctness of our parallel implementation. ````julia @@ -120,14 +123,22 @@ using BenchmarkTools ```` ```` - 12.782 ms (0 allocations: 0 bytes) - 2.563 ms (37 allocations: 3.16 KiB) + 24.348 ms (0 allocations: 0 bytes) + 2.457 ms (69 allocations: 6.05 KiB) ```` Because the problem is trivially parallel - all threads to the same thing and don't need to communicate - we expect an ideal speedup of (close to) the number of available threads. +````julia +nthreads() +```` + +```` +10 +```` + --- *This page was generated using [Literate.jl](https://github.com/fredrikekre/Literate.jl).* diff --git a/docs/src/literate/juliaset/juliaset.jl b/docs/src/literate/juliaset/juliaset.jl index 33b5c3ee..3a290792 100644 --- a/docs/src/literate/juliaset/juliaset.jl +++ b/docs/src/literate/juliaset/juliaset.jl @@ -111,12 +111,12 @@ img = zeros(Int, N, N) # the load balancing of the default dynamic scheduler. The latter divides the overall # workload into tasks that can then be dynamically distributed among threads to adjust the # per-thread load. We can try to fine tune and improve the load balancing further by -# increasing the `nchunks` parameter of the scheduler, that is, creating more and smaller -# tasks. +# increasing the `ntasks` parameter of the scheduler, that is, creating more tasks with +# smaller per-task workload. using OhMyThreads: DynamicScheduler -@btime compute_juliaset_parallel!($img; scheduler=DynamicScheduler(; nchunks=N)) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; ntasks=N, scheduler=:dynamic) samples=10 evals=3; # Note that while this turns out to be a bit faster, it comes at the expense of much more # allocations. @@ -126,4 +126,4 @@ using OhMyThreads: DynamicScheduler using OhMyThreads: StaticScheduler -@btime compute_juliaset_parallel!($img; scheduler=StaticScheduler()) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; scheduler=:static) samples=10 evals=3; diff --git a/docs/src/literate/juliaset/juliaset.md b/docs/src/literate/juliaset/juliaset.md index 1ffb385b..eb7e9003 100644 --- a/docs/src/literate/juliaset/juliaset.md +++ b/docs/src/literate/juliaset/juliaset.md @@ -121,9 +121,9 @@ img = zeros(Int, N, N) ```` ```` -nthreads() = 5 - 138.157 ms (0 allocations: 0 bytes) - 40.373 ms (67 allocations: 6.20 KiB) +nthreads() = 10 + 131.295 ms (0 allocations: 0 bytes) + 31.422 ms (68 allocations: 6.09 KiB) ```` @@ -135,17 +135,17 @@ As stated above, the per-pixel computation is non-uniform. Hence, we do benefit the load balancing of the default dynamic scheduler. The latter divides the overall workload into tasks that can then be dynamically distributed among threads to adjust the per-thread load. We can try to fine tune and improve the load balancing further by -increasing the `nchunks` parameter of the scheduler, that is, creating more and smaller -tasks. +increasing the `ntasks` parameter of the scheduler, that is, creating more tasks with +smaller per-task workload. ````julia using OhMyThreads: DynamicScheduler -@btime compute_juliaset_parallel!($img; scheduler=DynamicScheduler(; nchunks=N)) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; ntasks=N, scheduler=:dynamic) samples=10 evals=3; ```` ```` - 31.751 ms (12011 allocations: 1.14 MiB) + 17.438 ms (12018 allocations: 1.11 MiB) ```` @@ -158,11 +158,11 @@ To quantify the impact of load balancing we can opt out of dynamic scheduling an ````julia using OhMyThreads: StaticScheduler -@btime compute_juliaset_parallel!($img; scheduler=StaticScheduler()) samples=10 evals=3; +@btime compute_juliaset_parallel!($img; scheduler=:static) samples=10 evals=3; ```` ```` - 63.147 ms (37 allocations: 3.26 KiB) + 30.097 ms (73 allocations: 6.23 KiB) ```` diff --git a/docs/src/literate/mc/mc.jl b/docs/src/literate/mc/mc.jl index 89bea82d..6a9abd37 100644 --- a/docs/src/literate/mc/mc.jl +++ b/docs/src/literate/mc/mc.jl @@ -74,8 +74,8 @@ using Base.Threads: nthreads using OhMyThreads: StaticScheduler -@btime mc_parallel($N) samples=10 evals=3; -@btime mc_parallel($N; scheduler = StaticScheduler()) samples=10 evals=3; +@btime mc_parallel($N; scheduler=:dynamic) samples=10 evals=3; # default +@btime mc_parallel($N; scheduler=:static) samples=10 evals=3; # ## Manual parallelization # diff --git a/docs/src/literate/mc/mc.md b/docs/src/literate/mc/mc.md index 5696e5d9..44506abb 100644 --- a/docs/src/literate/mc/mc.md +++ b/docs/src/literate/mc/mc.md @@ -34,7 +34,7 @@ mc(N) ```` ```` -3.14145748 +3.14171236 ```` ## Parallelization with `tmapreduce` @@ -69,7 +69,7 @@ mc_parallel(N) ```` ```` -3.14134792 +3.14156496 ```` Let's run a quick benchmark. @@ -86,9 +86,9 @@ using Base.Threads: nthreads ```` ```` -nthreads() = 5 - 317.745 ms (0 allocations: 0 bytes) - 88.384 ms (66 allocations: 5.72 KiB) +nthreads() = 10 + 301.636 ms (0 allocations: 0 bytes) + 41.864 ms (68 allocations: 5.81 KiB) ```` @@ -100,13 +100,13 @@ and compare the performance of static and dynamic scheduling (with default param ````julia using OhMyThreads: StaticScheduler -@btime mc_parallel($N) samples=10 evals=3; -@btime mc_parallel($N; scheduler=StaticScheduler()) samples=10 evals=3; +@btime mc_parallel($N; scheduler=:dynamic) samples=10 evals=3; # default +@btime mc_parallel($N; scheduler=:static) samples=10 evals=3; ```` ```` - 88.222 ms (66 allocations: 5.72 KiB) - 88.203 ms (36 allocations: 2.98 KiB) + 41.839 ms (68 allocations: 5.81 KiB) + 41.838 ms (68 allocations: 5.81 KiB) ```` @@ -121,7 +121,7 @@ simulation. Finally, we fetch the results and compute the average estimate for $ using OhMyThreads: @spawn, chunks function mc_parallel_manual(N; nchunks = nthreads()) - tasks = map(chunks(1:N; n = nchunks)) do idcs # TODO: replace by `tmap` once ready + tasks = map(chunks(1:N; n = nchunks)) do idcs @spawn mc(length(idcs)) end pi = sum(fetch, tasks) / nchunks @@ -132,7 +132,7 @@ mc_parallel_manual(N) ```` ```` -3.1414609999999996 +3.14180504 ```` And this is the performance: @@ -142,7 +142,7 @@ And this is the performance: ```` ```` - 64.042 ms (31 allocations: 2.80 KiB) + 30.224 ms (65 allocations: 5.70 KiB) ```` @@ -161,8 +161,8 @@ end samples=10 evals=3; ```` ```` - 88.041 ms (0 allocations: 0 bytes) - 63.427 ms (0 allocations: 0 bytes) + 41.750 ms (0 allocations: 0 bytes) + 30.148 ms (0 allocations: 0 bytes) ```` diff --git a/docs/src/literate/tls/tls.jl b/docs/src/literate/tls/tls.jl index 83ce93c0..20c77adc 100644 --- a/docs/src/literate/tls/tls.jl +++ b/docs/src/literate/tls/tls.jl @@ -172,9 +172,9 @@ using OhMyThreads: @tasks function matmulsums_tlv_macro(As, Bs; kwargs...) N = size(first(As), 1) - @tasks for i in eachindex(As,Bs) - @set collect=true - @local C::Matrix{Float64} = Matrix{Float64}(undef, N, N) + @tasks for i in eachindex(As, Bs) + @set collect = true + @local C = Matrix{Float64}(undef, N, N) mul!(C, As[i], Bs[i]) sum(C) end @@ -183,9 +183,10 @@ end res_tlv_macro = matmulsums_tlv_macro(As, Bs) res ≈ res_tlv_macro -# Here, `@local` expands to a pattern similar to the `TaskLocalValue` one above, although it -# carries some optimizations (see [`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task -# local values more efficient in loops which take on the order of 100ns to complete. +# Here, `@local` expands to a pattern similar to the `TaskLocalValue` one above, although automatically +# infers that the object's type is `Matrix{Float64}`, and it carries some optimizations (see +# [`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task local values more efficient in +# loops which take on the order of 100ns to complete. # # # ### Benchmark @@ -210,23 +211,7 @@ sleep(2) #hide # As we can see, `matmulsums_tlv` (and `matmulsums_tlv_macro`) isn't only convenient # but also efficient: It allocates much less memory than `matmulsums_naive` and is about on # par with the manual implementation. - -# #### Tuning the scheduling # -# Since the workload is uniform, we don't need load balancing. We can thus try to improve -# the performance and reduce the number of allocations by choosing the number of chunks -# (i.e. tasks) to match the number of Julia threads. Concretely, this -# amounts to passing in `DynamicScheduler(; nchunks=nthreads())`. If we further want to -# opt-out of dynamic scheduling alltogether, we can choose the `StaticScheduler()`. - -using OhMyThreads: DynamicScheduler, StaticScheduler - -@btime matmulsums_tlv( - $As, $Bs; scheduler = $(DynamicScheduler(; nchunks = nthreads()))); -@btime matmulsums_tlv($As, $Bs; scheduler = $(StaticScheduler())); - -# Interestingly, this doesn't always lead to speedups (maybe even a slight slowdown). - # # ## Per-thread allocation # @@ -284,13 +269,14 @@ res_nu ≈ res_pt_naive # ### The quick fix (with caveats) # # A simple solution for the task-migration issue is to opt-out of dynamic scheduling with -# the `StaticScheduler()`. This scheduler statically assigns tasks to threads -# upfront without any dynamic rescheduling (the tasks are sticky and won't migrate). +# `scheduler=:static` (or `scheduler=StaticScheduler()`). This scheduler statically +# assigns tasks to threads upfront without any dynamic rescheduling +# (the tasks are sticky and won't migrate). # function matmulsums_perthread_static(As, Bs) N = size(first(As), 1) Cs = [Matrix{Float64}(undef, N, N) for _ in 1:nthreads()] - tmap(As, Bs; scheduler = StaticScheduler()) do A, B + tmap(As, Bs; scheduler = :static) do A, B C = Cs[threadid()] mul!(C, A, B) sum(C) @@ -335,27 +321,21 @@ res_nu ≈ res_pt_channel # ### Benchmark # # Let's benchmark the variants above and compare them to the task-local implementation. -# We want to look at both `nchunks = nthreads()` and `nchunks > nthreads()`, the latter +# We want to look at both `ntasks = nthreads()` and `ntasks > nthreads()`, the latter # of which gives us dynamic load balancing. # -## no load balancing because nchunks == nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +## no load balancing because ntasks == nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu); @btime matmulsums_perthread_static($As_nu, $Bs_nu); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu); -## load balancing because nchunks > nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); +## load balancing because ntasks > nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 2 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 2 * nthreads()); -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 10 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 10 * nthreads()); # # Note that the runtime of `matmulsums_perthread_channel` improves with increasing number @@ -370,14 +350,15 @@ res_nu ≈ res_pt_channel # a limited number of tasks (e.g. `nthreads()`) with task-local buffers. # using OhMyThreads: tmapreduce + function matmulsums_perthread_channel_flipped(As, Bs; ntasks = nthreads()) N = size(first(As), 1) - chnl = Channel{Int}(length(As); spawn=true) do chnl + chnl = Channel{Int}(length(As); spawn = true) do chnl for i in 1:length(As) put!(chnl, i) end end - tmapreduce(vcat, 1:ntasks; scheduler = DynamicScheduler(; nchunks = 0)) do _ # we turn chunking off + tmapreduce(vcat, 1:ntasks; chunking=false) do _ # we turn chunking off local C = Matrix{Float64}(undef, N, N) map(chnl) do i # implicitly takes the values from the channel (parallel safe) A = As[i] @@ -407,7 +388,7 @@ sort(res_nu) ≈ sort(res_channel_flipped) # give [Bumper.jl](https://github.com/MasonProtter/Bumper.jl) a try. Essentially, it # allows you to *bring your own stacks*, that is, task-local bump allocators which you can # dynamically allocate memory to, and reset them at the end of a code block, just like -# Julia's stack. +# Julia's stack. # Be warned though that Bumper.jl is (1) a rather young package with (likely) some bugs # and (2) can easily lead to segfaults when used incorrectly. If you can live with the # risk, Bumper.jl is especially useful for causes we don't know ahead of time how large @@ -429,7 +410,7 @@ function matmulsums_bumper(As, Bs) end res_bumper = matmulsums_bumper(As, Bs); -sort(res_nu) ≈ sort(res_bumper) +sort(res) ≈ sort(res_bumper) @btime matmulsums_bumper($As, $Bs); diff --git a/docs/src/literate/tls/tls.md b/docs/src/literate/tls/tls.md index 09b62113..23894478 100644 --- a/docs/src/literate/tls/tls.md +++ b/docs/src/literate/tls/tls.md @@ -222,9 +222,9 @@ using OhMyThreads: @tasks function matmulsums_tlv_macro(As, Bs; kwargs...) N = size(first(As), 1) - @tasks for i in eachindex(As,Bs) - @set collect=true - @local C::Matrix{Float64} = Matrix{Float64}(undef, N, N) + @tasks for i in eachindex(As, Bs) + @set collect = true + @local C = Matrix{Float64}(undef, N, N) mul!(C, As[i], Bs[i]) sum(C) end @@ -238,9 +238,10 @@ res ≈ res_tlv_macro true ```` -Here, `@local` expands to a pattern similar to the `TaskLocalValue` one above, although it -carries some optimizations (see [`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task -local values more efficient in loops which take on the order of 100ns to complete. +Here, `@local` expands to a pattern similar to the `TaskLocalValue` one above, although automatically +infers that the object's type is `Matrix{Float64}`, and it carries some optimizations (see +[`OhMyThreads.WithTaskLocals`](@ref)) which can make accessing task local values more efficient in +loops which take on the order of 100ns to complete. ### Benchmark @@ -262,11 +263,11 @@ using BenchmarkTools ```` nthreads() = 10 - 49.077 ms (3 allocations: 518.17 KiB) - 32.658 ms (1691 allocations: 384.08 MiB) - 9.513 ms (200 allocations: 10.08 MiB) - 9.588 ms (236 allocations: 10.05 MiB) - 9.650 ms (239 allocations: 10.05 MiB) + 61.314 ms (3 allocations: 518.17 KiB) + 22.122 ms (1621 allocations: 384.06 MiB) + 7.620 ms (204 allocations: 10.08 MiB) + 7.702 ms (126 allocations: 5.03 MiB) + 7.600 ms (127 allocations: 5.03 MiB) ```` @@ -274,29 +275,6 @@ As we can see, `matmulsums_tlv` (and `matmulsums_tlv_macro`) isn't only convenie but also efficient: It allocates much less memory than `matmulsums_naive` and is about on par with the manual implementation. -#### Tuning the scheduling - -Since the workload is uniform, we don't need load balancing. We can thus try to improve -the performance and reduce the number of allocations by choosing the number of chunks -(i.e. tasks) to match the number of Julia threads. Concretely, this -amounts to passing in `DynamicScheduler(; nchunks=nthreads())`. If we further want to -opt-out of dynamic scheduling alltogether, we can choose the `StaticScheduler()`. - -````julia -using OhMyThreads: DynamicScheduler, StaticScheduler - -@btime matmulsums_tlv( - $As, $Bs; scheduler = $(DynamicScheduler(; nchunks = nthreads()))); -@btime matmulsums_tlv($As, $Bs; scheduler = $(StaticScheduler())); -```` - -```` - 9.561 ms (124 allocations: 5.03 MiB) - 9.618 ms (124 allocations: 5.03 MiB) - -```` - -Interestingly, this doesn't always lead to speedups (maybe even a slight slowdown). ## Per-thread allocation @@ -360,14 +338,15 @@ above, but you can't rely on it!) ### The quick fix (with caveats) A simple solution for the task-migration issue is to opt-out of dynamic scheduling with -the `StaticScheduler()`. This scheduler statically assigns tasks to threads -upfront without any dynamic rescheduling (the tasks are sticky and won't migrate). +`scheduler=:static` (or `scheduler=StaticScheduler()`). This scheduler statically +assigns tasks to threads upfront without any dynamic rescheduling +(the tasks are sticky and won't migrate). ````julia function matmulsums_perthread_static(As, Bs) N = size(first(As), 1) Cs = [Matrix{Float64}(undef, N, N) for _ in 1:nthreads()] - tmap(As, Bs; scheduler = StaticScheduler()) do A, B + tmap(As, Bs; scheduler = :static) do A, B C = Cs[threadid()] mul!(C, A, B) sum(C) @@ -422,37 +401,31 @@ true ### Benchmark Let's benchmark the variants above and compare them to the task-local implementation. -We want to look at both `nchunks = nthreads()` and `nchunks > nthreads()`, the latter +We want to look at both `ntasks = nthreads()` and `ntasks > nthreads()`, the latter of which gives us dynamic load balancing. ````julia -# no load balancing because nchunks == nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +# no load balancing because ntasks == nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu); @btime matmulsums_perthread_static($As_nu, $Bs_nu); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = nthreads()))); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu); -# load balancing because nchunks > nthreads() -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 2 * nthreads()))); +# load balancing because ntasks > nthreads() +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 2 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 2 * nthreads()); -@btime matmulsums_tlv($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); -@btime matmulsums_perthread_channel($As_nu, $Bs_nu; - scheduler = $(DynamicScheduler(; nchunks = 10 * nthreads()))); +@btime matmulsums_tlv($As_nu, $Bs_nu; ntasks = 10 * nthreads()); +@btime matmulsums_perthread_channel($As_nu, $Bs_nu; ntasks = 10 * nthreads()); ```` ```` - 149.095 ms (124 allocations: 5.03 MiB) - 175.355 ms (107 allocations: 5.02 MiB) - 148.470 ms (112 allocations: 5.02 MiB) - 137.638 ms (235 allocations: 10.05 MiB) - 135.293 ms (183 allocations: 5.04 MiB) - 124.591 ms (1116 allocations: 50.13 MiB) - 124.716 ms (744 allocations: 5.10 MiB) + 170.563 ms (126 allocations: 5.03 MiB) + 165.647 ms (108 allocations: 5.02 MiB) + 172.216 ms (114 allocations: 5.02 MiB) + 108.662 ms (237 allocations: 10.05 MiB) + 114.673 ms (185 allocations: 5.04 MiB) + 97.933 ms (1118 allocations: 50.13 MiB) + 96.868 ms (746 allocations: 5.10 MiB) ```` @@ -469,14 +442,15 @@ a limited number of tasks (e.g. `nthreads()`) with task-local buffers. ````julia using OhMyThreads: tmapreduce + function matmulsums_perthread_channel_flipped(As, Bs; ntasks = nthreads()) N = size(first(As), 1) - chnl = Channel{Int}(length(As); spawn=true) do chnl + chnl = Channel{Int}(length(As); spawn = true) do chnl for i in 1:length(As) put!(chnl, i) end end - tmapreduce(vcat, 1:ntasks; scheduler = DynamicScheduler(; nchunks = 0)) do _ # we turn chunking off + tmapreduce(vcat, 1:ntasks; chunking=false) do _ # we turn chunking off local C = Matrix{Float64}(undef, N, N) map(chnl) do i # implicitly takes the values from the channel (parallel safe) A = As[i] @@ -510,9 +484,9 @@ Quick benchmark: ```` ```` - 121.715 ms (163 allocations: 5.07 MiB) - 122.457 ms (267 allocations: 10.11 MiB) - 122.374 ms (1068 allocations: 50.37 MiB) + 94.389 ms (170 allocations: 5.07 MiB) + 94.580 ms (271 allocations: 10.10 MiB) + 94.768 ms (1071 allocations: 50.41 MiB) ```` @@ -545,13 +519,13 @@ function matmulsums_bumper(As, Bs) end res_bumper = matmulsums_bumper(As, Bs); -sort(res_nu) ≈ sort(res_bumper) +sort(res) ≈ sort(res_bumper) @btime matmulsums_bumper($As, $Bs); ```` ```` - 9.865 ms (254 allocations: 50.92 KiB) + 7.814 ms (134 allocations: 27.92 KiB) ```` diff --git a/docs/src/translation.md b/docs/src/translation.md index f4775576..a5a28f7d 100644 --- a/docs/src/translation.md +++ b/docs/src/translation.md @@ -45,7 +45,7 @@ end # or -tforeach(1:10; scheduler=StaticScheduler()) do i +tforeach(1:10; scheduler=:static) do i println(i) end ``` @@ -62,13 +62,13 @@ end ```julia # OhMyThreads @tasks for i in 1:10 - @set scheduler=DynamicScheduler(; nchunks=0) # turn off chunking + @set chunking=false println(i) end # or -tforeach(1:10; scheduler=DynamicScheduler(; nchunks=0)) do i +tforeach(1:10; chunking=false) do i println(i) end ``` diff --git a/src/functions.jl b/src/functions.jl index 4b767ca3..97436ac0 100644 --- a/src/functions.jl +++ b/src/functions.jl @@ -1,6 +1,6 @@ """ tmapreduce(f, op, A::AbstractArray...; - [scheduler::Scheduler = DynamicScheduler()], + [scheduler::Union{Scheduler, Symbol} = :dynamic], [outputtype::Type = Any], [init]) @@ -27,15 +27,23 @@ is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. -- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. -- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. +- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. +- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation. + +In addition, `tmapreduce` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tmapreduce(√, +, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tmapreduce end """ treducemap(op, f, A::AbstractArray...; - [scheduler::Scheduler = DynamicScheduler()], + [scheduler::Union{Scheduler, Symbol} = :dynamic], [outputtype::Type = Any], [init]) @@ -52,7 +60,7 @@ will get undefined results. ## Example: ``` -tmapreduce(√, +, [1, 2, 3, 4, 5]) +treducemap(+, √, [1, 2, 3, 4, 5]) ``` is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form @@ -63,15 +71,23 @@ is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. -- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. -- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. +- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. +- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation. + +In addition, `treducemap` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +treducemap(+, √, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function treducemap end """ treduce(op, A::AbstractArray...; - [scheduler::Scheduler = DynamicScheduler()], + [scheduler::Union{Scheduler, Symbol} = :dynamic], [outputtype::Type = Any], [init]) @@ -97,15 +113,23 @@ is the parallelized version of `sum([1, 2, 3, 4, 5])` in the form ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. -- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. -- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. +- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument. +- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation. + +In addition, `treduce` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +treduce(+, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function treduce end """ tforeach(f, A::AbstractArray...; - [schedule::Scheduler = DynamicScheduler()]) :: Nothing + [schedule::Union{Scheduler, Symbol} = :dynamic]) :: Nothing A multithreaded function like `Base.foreach`. Apply `f` to each element of `A` on multiple parallel tasks, and return `nothing`. I.e. it is the parallel equivalent of @@ -126,13 +150,23 @@ end ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tforeach` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tforeach(1:10; chunksize=2, scheduler=:static) do i + println(i^2) +end +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tforeach end """ tmap(f, [OutputElementType], A::AbstractArray...; - [schedule::Scheduler = DynamicScheduler()]) + [schedule::Union{Scheduler, Symbol} = :dynamic]) A multithreaded function like `Base.map`. Create a new container `similar` to `A` and fills it in parallel such that the `i`th element is equal to `f(A[i])`. @@ -149,26 +183,39 @@ tmap(sin, 1:10) ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tmap` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tmap(sin, 1:10; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tmap end """ tmap!(f, out, A::AbstractArray...; - [schedule::Scheduler = DynamicScheduler()]) + [schedule::Union{Scheduler, Symbol} = :dynamic]) A multithreaded function like `Base.map!`. In parallel on multiple tasks, this function assigns each element of `out[i] = f(A[i])` for each index `i` of `A` and `out`. ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tmap!` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tmap! end """ tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}}; - [schedule::Scheduler = DynamicScheduler()]) + [schedule::Union{Scheduler, Symbol} = :dynamic]) A multithreaded function like `Base.collect`. Essentially just calls `tmap` on the generator function and inputs. @@ -185,6 +232,14 @@ tcollect(sin(i) for i in 1:10) ## Keyword arguments: -- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information. +- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers. + +In addition, `tcollect` accepts **all keyword arguments that are supported by the selected +scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example: +``` +tcollect(sin(i) for i in 1:10; chunksize=2, scheduler=:static) +``` +However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`** +(but not for `scheduler::Scheduler`). """ function tcollect end diff --git a/src/implementation.jl b/src/implementation.jl index c0cbf517..b018f0a4 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -1,20 +1,22 @@ module Implementation import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcollect - using OhMyThreads: chunks, @spawn, @spawnat, WithTaskLocals, promise_task_local using OhMyThreads.Tools: nthtid -using OhMyThreads: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, +using OhMyThreads: Scheduler, + DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler -using OhMyThreads.Schedulers: chunking_enabled, chunking_mode, ChunkingMode, NoChunking, - FixedSize, FixedCount +using OhMyThreads.Schedulers: chunking_enabled, + chunking_mode, ChunkingMode, NoChunking, + FixedSize, FixedCount, scheduler_from_symbol, NotGiven, + isgiven using Base: @propagate_inbounds using Base.Threads: nthreads, @threads - using BangBang: append!! - using ChunkSplitters: ChunkSplitters +const MaybeScheduler = Union{NotGiven, Scheduler, Symbol} + include("macro_impl.jl") function auto_disable_chunking_warning() @@ -24,33 +26,52 @@ function auto_disable_chunking_warning() "or pass in `collect(chunks(...))`.") end -function _chunks(sched, arg; kwargs...) +function _chunks(sched, arg) C = chunking_mode(sched) @assert C != NoChunking if C == FixedCount - chunks(arg; n = sched.nchunks, split = sched.split, kwargs...) + chunks(arg; + n = sched.nchunks, + split = sched.split)::ChunkSplitters.Chunk{ + typeof(arg), ChunkSplitters.FixedCount} elseif C == FixedSize - chunks(arg; size = sched.chunksize, split = sched.split, kwargs...) + chunks(arg; + size = sched.chunksize, + split = sched.split)::ChunkSplitters.Chunk{ + typeof(arg), ChunkSplitters.FixedSize} + end +end + +function _scheduler_from_userinput(scheduler::MaybeScheduler; kwargs...) + if scheduler isa Scheduler + isempty(kwargs) || scheduler_and_kwargs_err(; kwargs...) + _scheduler = scheduler + elseif scheduler isa Symbol + _scheduler = scheduler_from_symbol(scheduler; kwargs...) + else # default fallback + _scheduler = DynamicScheduler(; kwargs...) end end function tmapreduce(f, op, Arrs...; - scheduler::Scheduler = DynamicScheduler(), + scheduler::MaybeScheduler = NotGiven(), outputtype::Type = Any, - mapreduce_kwargs...) - min_kwarg_len = haskey(mapreduce_kwargs, :init) ? 1 : 0 - if length(mapreduce_kwargs) > min_kwarg_len - tmapreduce_kwargs_err(; mapreduce_kwargs...) - end - if scheduler isa SerialScheduler + init = NotGiven(), + kwargs...) + mapreduce_kwargs = isgiven(init) ? (; init) : (;) + _scheduler = _scheduler_from_userinput(scheduler; kwargs...) + + # @show _scheduler + if _scheduler isa SerialScheduler mapreduce(f, op, Arrs...; mapreduce_kwargs...) else - @noinline _tmapreduce(f, op, Arrs, outputtype, scheduler, mapreduce_kwargs) + @noinline _tmapreduce(f, op, Arrs, outputtype, _scheduler, mapreduce_kwargs) end end -@noinline function tmapreduce_kwargs_err(; init = nothing, kwargs...) - error("got unsupported keyword arguments: $((;kwargs...,)) ") +@noinline function scheduler_and_kwargs_err(; kwargs...) + kwargstr = join(string.(keys(kwargs)), ", ") + throw(ArgumentError("Providing an explicit scheduler as well as direct keyword arguments (e.g. $(kwargstr)) is currently not supported.")) end treducemap(op, f, A...; kwargs...) = tmapreduce(f, op, A...; kwargs...) @@ -103,7 +124,7 @@ function _tmapreduce(f, Arrs, ::Type{OutputType}, scheduler::StaticScheduler, - mapreduce_kwargs) where {OutputType} + mapreduce_kwargs)::OutputType where {OutputType} nt = nthreads() check_all_have_same_indices(Arrs) if chunking_enabled(scheduler) @@ -112,8 +133,8 @@ function _tmapreduce(f, args = map(A -> view(A, inds), Arrs) # Note, calling `promise_task_local` here is only safe because we're assuming that # Base.mapreduce isn't going to magically try to do multithreading on us... - @spawnat tid mapreduce( - promise_task_local(f), promise_task_local(op), args...; mapreduce_kwargs...) + @spawnat tid mapreduce(promise_task_local(f), promise_task_local(op), args...; + mapreduce_kwargs...) end # Note, calling `promise_task_local` here is only safe because we're assuming that # Base.mapreduce isn't going to magically try to do multithreading on us... @@ -136,7 +157,7 @@ function _tmapreduce(f, Arrs::Tuple{ChunkSplitters.Chunk{T}}, # we don't support multiple chunks for now ::Type{OutputType}, scheduler::StaticScheduler, - mapreduce_kwargs) where {OutputType, T} + mapreduce_kwargs)::OutputType where {OutputType, T} chunking_enabled(scheduler) && auto_disable_chunking_warning() check_all_have_same_indices(Arrs) chnks = only(Arrs) @@ -248,33 +269,37 @@ end function tmap(f, A::Union{AbstractArray, ChunkSplitters.Chunk}, _Arrs::AbstractArray...; - scheduler::Scheduler = DynamicScheduler(), + scheduler::MaybeScheduler = NotGiven(), kwargs...) - if scheduler isa GreedyScheduler + _scheduler = _scheduler_from_userinput(scheduler; kwargs...) + + if _scheduler isa GreedyScheduler error("Greedy scheduler isn't supported with `tmap` unless you provide an `OutputElementType` argument, since the greedy schedule requires a commutative reducing operator.") end - if chunking_enabled(scheduler) && hasfield(typeof(scheduler), :split) && - scheduler.split != :batch - error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $scheduler)") + if chunking_enabled(_scheduler) && hasfield(typeof(_scheduler), :split) && + _scheduler.split != :batch + error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $_scheduler)") end - if A isa ChunkSplitters.Chunk && chunking_enabled(scheduler) + if A isa ChunkSplitters.Chunk && chunking_enabled(_scheduler) auto_disable_chunking_warning() - if scheduler isa DynamicScheduler - scheduler = DynamicScheduler(; nchunks = 0, scheduler.threadpool) - elseif scheduler isa StaticScheduler - scheduler = StaticScheduler(; nchunks = 0) + if _scheduler isa DynamicScheduler + _scheduler = DynamicScheduler(; + threadpool = _scheduler.threadpool, + chunking = false) + elseif _scheduler isa StaticScheduler + _scheduler = StaticScheduler(; chunking = false) else error("Can't disable chunking for this scheduler?! Shouldn't be reached.", - scheduler) + _scheduler) end end Arrs = (A, _Arrs...) - if scheduler isa SerialScheduler + if _scheduler isa SerialScheduler map(f, Arrs...; kwargs...) else check_all_have_same_indices(Arrs) - @noinline _tmap(scheduler, f, A, _Arrs...; kwargs...) + @noinline _tmap(_scheduler, f, A, _Arrs...) end end @@ -282,8 +307,7 @@ end function _tmap(scheduler::DynamicScheduler{NoChunking}, f, A::AbstractArray, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...;) (; threadpool) = scheduler Arrs = (A, _Arrs...) tasks = map(eachindex(A)) do i @@ -300,8 +324,7 @@ end function _tmap(scheduler::DynamicScheduler{NoChunking}, f, A::ChunkSplitters.Chunk, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...) (; threadpool) = scheduler tasks = map(A) do idcs @spawn threadpool promise_task_local(f)(idcs) @@ -313,8 +336,7 @@ end function _tmap(scheduler::StaticScheduler{NoChunking}, f, A::ChunkSplitters.Chunk, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...) nt = nthreads() tasks = map(enumerate(A)) do (c, idcs) tid = @inbounds nthtid(mod1(c, nt)) @@ -327,8 +349,7 @@ end function _tmap(scheduler::StaticScheduler{NoChunking}, f, A::AbstractArray, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...;) Arrs = (A, _Arrs...) nt = nthreads() tasks = map(enumerate(A)) do (c, i) @@ -346,8 +367,7 @@ end function _tmap(scheduler::Scheduler, f, A::AbstractArray, - _Arrs::AbstractArray...; - kwargs...) + _Arrs::AbstractArray...) Arrs = (A, _Arrs...) idcs = collect(_chunks(scheduler, A)) reduction_f = append!! @@ -357,7 +377,7 @@ function _tmap(scheduler::Scheduler, map(f, args...) end end - v = tmapreduce(mapping_f, reduction_f, idcs; scheduler, kwargs...) + v = tmapreduce(mapping_f, reduction_f, idcs; scheduler) reshape(v, size(A)...) end @@ -365,14 +385,16 @@ end out, A::AbstractArray, _Arrs::AbstractArray...; - scheduler::Scheduler = DynamicScheduler(), + scheduler::MaybeScheduler = NotGiven(), kwargs...) - if hasfield(typeof(scheduler), :split) && scheduler.split != :batch - error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $scheduler)") + _scheduler = _scheduler_from_userinput(scheduler; kwargs...) + + if hasfield(typeof(_scheduler), :split) && _scheduler.split != :batch + error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $_scheduler)") end Arrs = (A, _Arrs...) - if scheduler isa SerialScheduler - map!(f, out, Arrs...; kwargs...) + if _scheduler isa SerialScheduler + map!(f, out, Arrs...) else @boundscheck check_all_have_same_indices((out, Arrs...)) mapping_f = maybe_rewrap(f) do f @@ -382,7 +404,7 @@ end out[i] = res end end - @noinline tforeach(mapping_f, eachindex(out); scheduler, kwargs...) + @noinline tforeach(mapping_f, eachindex(out); scheduler = _scheduler) out end end diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 74ce0287..b4e74a58 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -27,35 +27,49 @@ function tasks_macro(forex) make_mapping_function = if isempty(tls_names) :(local function mapping_function($itvar,) - $(forbody) - end) + $(forbody) + end) else :(local mapping_function = WithTaskLocals(($(tls_names...),)) do ($(locals_names...),) - function mapping_function_local($itvar,) - $(forbody) - end - end) + function mapping_function_local($itvar,) + $(forbody) + end + end) end - q = if !isnothing(settings.reducer) + q = if isgiven(settings.reducer) quote $make_mapping_function - tmapreduce(mapping_function, $(settings.reducer), $(itrng); scheduler = $(settings.scheduler)) + tmapreduce(mapping_function, $(settings.reducer), + $(itrng)) end - elseif settings.collect + elseif isgiven(settings.collect) maybe_warn_useless_init(settings) quote $make_mapping_function - tmap(mapping_function, $(itrng); scheduler = $(settings.scheduler)) + tmap(mapping_function, $(itrng)) end else maybe_warn_useless_init(settings) quote $make_mapping_function - tforeach(mapping_function, $(itrng); scheduler = $(settings.scheduler)) + tforeach(mapping_function, $(itrng)) end end + # insert keyword arguments into the function call + kwexpr = :($(Expr(:parameters))) + if isgiven(settings.scheduler) + push!(kwexpr.args, Expr(:kw, :scheduler, settings.scheduler)) + end + if isgiven(settings.init) + push!(kwexpr.args, Expr(:kw, :init, settings.init)) + end + for (k, v) in settings.kwargs + push!(kwexpr.args, Expr(:kw, k, v)) + end + insert!(q.args[4].args, 2, kwexpr) + # wrap everything in a let ... end block # and, potentially, define the `TaskLocalValue`s. result = :(let @@ -71,27 +85,17 @@ function tasks_macro(forex) end function maybe_warn_useless_init(settings) - !isnothing(settings.init) && + isgiven(settings.init) && @warn("The @set init = ... settings won't have any effect because no reduction is performed.") end Base.@kwdef mutable struct Settings - scheduler::Expr = :(DynamicScheduler()) - reducer::Union{Expr, Symbol, Nothing} = nothing - collect::Bool = false - init::Union{Expr, Symbol, Nothing} = nothing -end - -function _sym2scheduler(s) - if s == :dynamic - :(DynamicScheduler()) - elseif s == :static - :(StaticScheduler()) - elseif s == :greedy - :(GreedyScheduler()) - else - throw(ArgumentError("Unknown scheduler symbol.")) - end + # scheduler::Expr = :(DynamicScheduler()) + scheduler::Union{Expr, QuoteNode, NotGiven} = NotGiven() + reducer::Union{Expr, Symbol, NotGiven} = NotGiven() + collect::Union{Bool, NotGiven} = NotGiven() + init::Union{Expr, Symbol, NotGiven} = NotGiven() + kwargs::Vector{Pair{Symbol, Any}} = Pair{Symbol, Any}[] end function _maybe_handle_atlocal_block!(args) @@ -119,7 +123,7 @@ function _unfold_atlocal_block(ex) for x in tlsexprs localb, localn = _atlocal_assign_to_exprs(x) push!(locals_before, localb) - push!(locals_names, localn) + push!(locals_names, localn) end else throw(ErrorException("Wrong usage of @local. You must either provide a typed assignment or multiple typed assignments in a `begin ... end` block.")) @@ -127,20 +131,36 @@ function _unfold_atlocal_block(ex) return locals_before, locals_names end +#= +If the TLS doesn't have a declared return type, we're going to use `CC.return_type` to get it +automatically. This would normally be non-kosher, but it's okay here for three reasons: +1) The task local value *only* exists within the function being called, meaning that the worldage +is frozen for the full lifetime of the TLV, so and `eval` can't change the outcome or cause incorrect inference. +2) We do not allow users to *write* to the task local value, they can only retrieve its value, so there's no +potential problems from the type being maximally narrow and then them trying to write a value of another type to it +3) the task local value is not user-observable. we never let the user inspect its type, unless they themselves are +using `code____` tools to inspect the generated code, hence if inference changes and gives a more or less precise +type, there's no observable semantic changes, just performance increases or decreases. +=# function _atlocal_assign_to_exprs(ex) left_ex = ex.args[1] - if left_ex isa Symbol || left_ex.head != :(::) - throw(ErrorException("Wrong usage of @local. Expected typed assignment, e.g. `A::Matrix{Float} = rand(2,2)`.")) - end - tls_sym = esc(left_ex.args[1]) - tls_type = esc(left_ex.args[2]) tls_def = esc(ex.args[2]) @gensym tl_storage - local_before = :($(tl_storage) = TaskLocalValue{$tls_type}(() -> $(tls_def))) + if Base.isexpr(left_ex, :(::)) + tls_sym = esc(left_ex.args[1]) + tls_type = esc(left_ex.args[2]) + local_before = :($(tl_storage) = TaskLocalValue{$tls_type}(() -> $(tls_def))) + else + tls_sym = esc(left_ex) + local_before = :($(tl_storage) = let f = () -> $(tls_def) + TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f) + end) + end local_name = :($(tls_sym)) return local_before, local_name end + function _maybe_handle_atset_block!(settings, args) idcs = findall(args) do arg arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set") @@ -159,7 +179,7 @@ function _maybe_handle_atset_block!(settings, args) end deleteat!(args, idcs) # check incompatible settings - if settings.collect && !isnothing(settings.reducer) + if isgiven(settings.collect) && settings.collect && isgiven(settings.reducer) throw(ArgumentError("Specifying both collect and reducer isn't supported.")) end end @@ -169,20 +189,15 @@ function _handle_atset_single_assign!(settings, ex) throw(ErrorException("Wrong usage of @set. Expected assignment, e.g. `scheduler = StaticScheduler()`.")) end sym = ex.args[1] - if !hasfield(Settings, sym) - throw(ArgumentError("Unknown setting \"$(sym)\". Must be ∈ $(fieldnames(Settings)).")) - end def = ex.args[2] - if sym == :collect && !(def isa Bool) - throw(ArgumentError("Setting collect can only be true or false.")) - #TODO support specifying the OutputElementType - end - def = if def isa QuoteNode - _sym2scheduler(def.value) - elseif def isa Bool - def + if hasfield(Settings, sym) + if sym == :collect && !(def isa Bool) + throw(ArgumentError("Setting collect can only be true or false.")) + #TODO support specifying the OutputElementType + end + def = def isa Bool ? def : esc(def) + setfield!(settings, sym, def) else - esc(def) + push!(settings.kwargs, sym => esc(def)) end - setfield!(settings, sym, def) end diff --git a/src/macros.jl b/src/macros.jl index fd3669c1..87bdf2e1 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -35,17 +35,27 @@ end end ``` +```julia +@tasks for i in 1:100 + @set ntasks=4*nthreads() + # non-uniform work... +end +``` + ```julia @tasks for i in 1:5 @set scheduler=:static println("i=", i, " → ", threadid()) end - ``` + ```julia @tasks for i in 1:100 - @set scheduler=DynamicScheduler(; nchunks=4*nthreads()) - # non-uniform work... + @set begin + scheduler=:static + chunksize=10 + end + println("i=", i, " → ", threadid()) end ``` """ @@ -64,10 +74,18 @@ Multiple settings are supported, either as separate `@set` statements or via ## Settings -* `scheduler` (e.g. `scheduler=:static`): Can be either a [`Scheduler`](@ref) or a `Symbol` (e.g. `:dynamic` or `:static`) * `reducer` (e.g. `reducer=+`): Indicates that a reduction should be performed with the provided binary function. See [`tmapreduce`](@ref) for more information. * `collect` (e.g. `collect=true`): Indicates that results should be collected (similar to `map`). + +All other settings will be passed on to the underlying parallel functions (e.g. [tmapreduce](@ref)) +as keyword arguments. Hence, you may provide whatever these functions accept as +keyword arguments. Among others, this includes + +* `scheduler` (e.g. `scheduler=:static`): Can be either a [`Scheduler`](@ref) or a `Symbol` (e.g. `:dynamic`, `:static`, `:serial`, or `:greedy`). * `init` (e.g. `init=0.0`): Initial value to be used in a reduction (requires `reducer=...`). + +Settings like `ntasks`, `chunksize`, and `split` etc. can be used to tune the scheduling policy (if the selected scheduler supports it). + """ macro set(args...) error("The @set macro may only be used inside of a @tasks block.") @@ -75,39 +93,50 @@ end @eval begin """ + @local name = value + @local name::T = value - + Can be used inside a `@tasks for ... end` block to specify [task-local values](@ref TLS) (TLV) via explicitly typed assignments. These values will be allocated once per task (rather than once per iteration) and can be re-used between different task-local iterations. - + There can only be a single `@local` block in a `@tasks for ... end` block. To specify multiple TLVs, use `@local begin ... end`. Compared to regular assignments, there are some limitations though, e.g. TLVs can't reference each other. - + ## Examples - + ```julia using OhMyThreads.Tools: taskid @tasks for i in 1:10 @set scheduler=DynamicScheduler(; nchunks=2) - @local x::Vector{Float64} = zeros(3) # TLV - + @local x = zeros(3) # TLV + x .+= 1 println(taskid(), " -> ", x) end ``` - + ```julia @tasks for i in 1:10 @local begin - x::Vector{Int64} = rand(Int, 3) - M::Matrix{Float64} = rand(3, 3) + x = rand(Int, 3) + M = rand(3, 3) end # ... end ``` + + Task local variables created by `@local` are by default constrained to their inferred type, + but if you need to, you can specify a different type during declaration: + ```julia + @tasks for i in 1:10 + @local x::Vector{Float64} = some_hard_to_infer_setup_function() + # ... + end + ``` """ macro $(Symbol("local"))(args...) error("The @local macro may only be used inside of a @tasks block.") diff --git a/src/schedulers.jl b/src/schedulers.jl index 7c4dc962..bc87cc5f 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -2,6 +2,14 @@ module Schedulers using Base.Threads: nthreads +# Used to indicate that a keyword argument has not been set by the user. +# We don't use Nothing because nothing maybe sometimes be a valid user input (e.g. for init) +struct NotGiven end +isgiven(::NotGiven) = false +isgiven(::T) where {T} = true + +const MaybeInteger = Union{Integer, NotGiven} + """ Supertype for all available schedulers: @@ -29,6 +37,8 @@ function _chunkingstr(s::Scheduler) end """ + DynamicScheduler (aka :dynamic) + The default dynamic scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are assigned to threads by Julia's dynamic scheduler and are non-sticky, that is, @@ -39,20 +49,20 @@ with other multithreaded code. ## Keyword arguments: -- `nchunks::Integer` (default `2 * nthreads(threadpool)`): +- `nchunks::Integer` or `ntasks::Integer` (default `nthreads(threadpool)`): * Determines the number of chunks (and thus also the number of parallel tasks). * Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. For `nchunks <= nthreads()` there are not enough chunks for any load balancing. * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. - `chunksize::Integer` (default not set) * Specifies the desired chunk size (instead of the number of chunks). - * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer). + * The options `chunksize` and `nchunks`/`ntasks` are **mutually exclusive** (only one may be a positive integer). - `split::Symbol` (default `:batch`): * Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained. * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. * Beware that for `split=:scatter` the order of elements isn't maintained and a reducer function must not only be associative but also **commutative**! - `chunking::Bool` (default `true`): * Controls whether input elements are grouped into chunks (`true`) or not (`false`). - * For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! + * For `chunking=false`, the arguments `nchunks`/`ntasks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! - `threadpool::Symbol` (default `:default`): * Possible options are `:default` and `:interactive`. * The high-priority pool `:interactive` should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes. @@ -63,8 +73,8 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler chunksize::Int split::Symbol - function DynamicScheduler( - threadpool::Symbol, nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true) + function DynamicScheduler(threadpool::Symbol, nchunks::Integer, chunksize::Integer, + split::Symbol; chunking::Bool = true) if !(threadpool in (:default, :interactive)) throw(ArgumentError("threadpool must be either :default or :interactive")) end @@ -72,10 +82,10 @@ struct DynamicScheduler{C <: ChunkingMode} <: Scheduler C = NoChunking else if !(nchunks > 0 || chunksize > 0) - throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false).")) + throw(ArgumentError("Either nchunks/ntasks or chunksize must be a positive integer (or chunking=false).")) end if nchunks > 0 && chunksize > 0 - throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer")) + throw(ArgumentError("nchunks/ntasks and chunksize are mutually exclusive and only one of them may be a positive integer")) end C = chunksize > 0 ? FixedSize : FixedCount end @@ -85,8 +95,9 @@ end function DynamicScheduler(; threadpool::Symbol = :default, - nchunks::Union{Integer, Nothing} = nothing, - chunksize::Union{Integer, Nothing} = nothing, + nchunks::MaybeInteger = NotGiven(), + ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks + chunksize::MaybeInteger = NotGiven(), chunking::Bool = true, split::Symbol = :batch) if !chunking @@ -94,25 +105,31 @@ function DynamicScheduler(; chunksize = -1 else # only choose nchunks default if chunksize hasn't been specified - if isnothing(nchunks) && isnothing(chunksize) - nchunks = 2 * nthreads(threadpool) + if !isgiven(nchunks) && !isgiven(chunksize) && !isgiven(ntasks) + nchunks = nthreads(threadpool) chunksize = -1 else - nchunks = isnothing(nchunks) ? -1 : nchunks - chunksize = isnothing(chunksize) ? -1 : chunksize + if isgiven(nchunks) && isgiven(ntasks) + throw(ArgumentError("nchunks and ntasks are aliases and only one may be provided")) + end + nchunks = isgiven(nchunks) ? nchunks : + isgiven(ntasks) ? ntasks : -1 + chunksize = isgiven(chunksize) ? chunksize : -1 end end DynamicScheduler(threadpool, nchunks, chunksize, split; chunking) end function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::DynamicScheduler) - print("DynamicScheduler", "\n") + print(io, "DynamicScheduler", "\n") cstr = _chunkingstr(s) println(io, "├ Chunking: ", cstr) print(io, "└ Threadpool: ", s.threadpool) end """ + StaticScheduler (aka :static) + A static low-overhead scheduler. Divides the given collection into chunks and then spawns a task per chunk to perform the requested operation in parallel. The tasks are statically assigned to threads up front and are made *sticky*, that is, @@ -124,16 +141,16 @@ Isn't well composable with other multithreaded code though. ## Keyword arguments: -- `nchunks::Integer` (default `nthreads()`): +- `nchunks::Integer` or `ntasks::Integer` (default `nthreads()`): * Determines the number of chunks (and thus also the number of parallel tasks). * Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads. * For `nchunks > nthreads()` the chunks will be distributed to the available threads in a round-robin fashion. - `chunksize::Integer` (default not set) * Specifies the desired chunk size (instead of the number of chunks). - * The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be non-zero). + * The options `chunksize` and `nchunks`/`ntasks` are **mutually exclusive** (only one may be non-zero). - `chunking::Bool` (default `true`): * Controls whether input elements are grouped into chunks (`true`) or not (`false`). - * For `chunking=false`, the arguments `nchunks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! + * For `chunking=false`, the arguments `nchunks`/`ntasks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly! - `split::Symbol` (default `:batch`): * Determines how the collection is divided into chunks. By default, each chunk consists of contiguous elements and order is maintained. * See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. @@ -144,16 +161,16 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler chunksize::Int split::Symbol - function StaticScheduler( - nchunks::Integer, chunksize::Integer, split::Symbol; chunking::Bool = true) + function StaticScheduler(nchunks::Integer, chunksize::Integer, split::Symbol; + chunking::Bool = true) if !chunking C = NoChunking else if !(nchunks > 0 || chunksize > 0) - throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false).")) + throw(ArgumentError("Either nchunks/ntasks or chunksize must be a positive integer (or chunking=false).")) end if nchunks > 0 && chunksize > 0 - throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer")) + throw(ArgumentError("nchunks/ntasks and chunksize are mutually exclusive and only one of them may be a positive integer")) end C = chunksize > 0 ? FixedSize : FixedCount end @@ -162,8 +179,9 @@ struct StaticScheduler{C <: ChunkingMode} <: Scheduler end function StaticScheduler(; - nchunks::Union{Integer, Nothing} = nothing, - chunksize::Union{Integer, Nothing} = nothing, + nchunks::MaybeInteger = NotGiven(), + ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks + chunksize::MaybeInteger = NotGiven(), chunking::Bool = true, split::Symbol = :batch) if !chunking @@ -171,25 +189,31 @@ function StaticScheduler(; chunksize = -1 else # only choose nchunks default if chunksize hasn't been specified - if isnothing(nchunks) && isnothing(chunksize) + if !isgiven(nchunks) && !isgiven(chunksize) && !isgiven(ntasks) nchunks = nthreads(:default) chunksize = -1 else - nchunks = isnothing(nchunks) ? -1 : nchunks - chunksize = isnothing(chunksize) ? -1 : chunksize + if isgiven(nchunks) && isgiven(ntasks) + throw(ArgumentError("nchunks and ntasks are aliases and only one may be provided")) + end + nchunks = isgiven(nchunks) ? nchunks : + isgiven(ntasks) ? ntasks : -1 + chunksize = isgiven(chunksize) ? chunksize : -1 end end StaticScheduler(nchunks, chunksize, split; chunking) end function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::StaticScheduler) - print("StaticScheduler", "\n") + print(io, "StaticScheduler", "\n") cstr = _chunkingstr(s) println(io, "├ Chunking: ", cstr) print(io, "└ Threadpool: default") end """ + GreedyScheduler (aka :greedy) + A greedy dynamic scheduler. The elements of the collection are first put into a `Channel` and then dynamic, non-sticky tasks are spawned to process channel content in parallel. @@ -216,12 +240,14 @@ Base.@kwdef struct GreedyScheduler <: Scheduler end function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::GreedyScheduler) - print("GreedyScheduler", "\n") + print(io, "GreedyScheduler", "\n") println(io, "├ Num. tasks: ", s.ntasks) print(io, "└ Threadpool: default") end """ + SerialScheduler (aka :serial) + A scheduler for turning off any multithreading and running the code in serial. It aims to make parallel functions like, e.g., `tmapreduce(sin, +, 1:100)` behave like their serial counterparts, e.g., `mapreduce(sin, +, 1:100)`. @@ -238,4 +264,14 @@ chunking_mode(::Type{SerialScheduler}) = NoChunking chunking_enabled(s::Scheduler) = chunking_enabled(typeof(s)) chunking_enabled(::Type{S}) where {S <: Scheduler} = chunking_mode(S) != NoChunking +scheduler_from_symbol(s::Symbol; kwargs...) = scheduler_from_symbol(Val{s}; kwargs...) +scheduler_from_symbol(::Type{Val{:static}}; kwargs...) = StaticScheduler(; kwargs...) +scheduler_from_symbol(::Type{Val{:dynamic}}; kwargs...) = DynamicScheduler(; kwargs...) +scheduler_from_symbol(::Type{Val{:greedy}}; kwargs...) = GreedyScheduler(; kwargs...) +scheduler_from_symbol(::Type{Val{:serial}}; kwargs...) = SerialScheduler(; kwargs...) +function scheduler_from_symbol(::Type{Val{T}}; kwargs...) where {T} + # fallback + throw(ArgumentError("unkown scheduler symbol :$T")) +end + end # module diff --git a/test/runtests.jl b/test/runtests.jl index 3eee363b..9174800a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -61,7 +61,7 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +, end end end -end +end; @testset "ChunkSplitters.Chunk" begin x = rand(100) @@ -76,7 +76,7 @@ end @test isnothing(tforeach(x -> sin.(x), chnks; scheduler)) end end -end +end; @testset "macro API" begin # basic @@ -132,15 +132,51 @@ end init=0.0 end i - end) == 55.0 + end) === 55.0 @test @tasks(for i in 1:10 @set begin reducer=(+) init=0.0*im end i - end) == (55.0 + 0.0im) + end) === (55.0 + 0.0im) + # top-level "kwargs" + @test @tasks(for i in 1:3 + @set scheduler=:static + @set ntasks=1 + i + end) |> isnothing + @test @tasks(for i in 1:3 + @set scheduler=:static + @set nchunks=2 + i + end) |> isnothing + @test @tasks(for i in 1:3 + @set scheduler=:dynamic + @set chunksize=2 + i + end) |> isnothing + @test @tasks(for i in 1:3 + @set scheduler=:dynamic + @set chunking=false + i + end) |> isnothing + @test_throws ArgumentError @tasks(for i in 1:3 + @set scheduler=DynamicScheduler() + @set chunking=false + i + end) + @test_throws MethodError @tasks(for i in 1:3 + @set scheduler=:serial + @set chunking=false + i + end) + @test_throws MethodError @tasks(for i in 1:3 + @set scheduler=:dynamic + @set asd=123 + i + end) # TaskLocalValue ntd = 2*Threads.nthreads() @@ -166,36 +202,44 @@ end x[] += 1 x[] end) == 1.5 * ntd # if a new x would be allocated per iteration, we'd get ntd here. - # TaskLocalValue (begin ... end block) - @test @tasks(for i in 1:10 + # TaskLocalValue (begin ... end block), inferred TLV type + @test @inferred (() -> @tasks for i in 1:10 @local begin - C::Matrix{Int64} = fill(4, 3, 3) - x::Vector{Float64} = fill(5.0, 3) + C = fill(4, 3, 3) + x = fill(5.0, 3) end @set reducer = (+) sum(C * x) - end) == 1800 + end)() == 1800 # hygiene / escaping var = 3 sched = StaticScheduler() + sched_sym = :static data = rand(10) red = (a,b) -> a+b + n = 2 @test @tasks(for d in data @set scheduler=sched @set reducer=red var * d end) ≈ var * sum(data) + @test @tasks(for d in data + @set scheduler=sched_sym + @set ntasks=n + @set reducer=red + var * d + end) ≈ var * sum(data) struct SingleInt x::Int end @test @tasks(for _ in 1:10 - @local C::SingleInt = SingleInt(var) + @local C = SingleInt(var) @set reducer=+ C.x end) == 10*var -end +end; @testset "WithTaskLocals" begin let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) @@ -234,7 +278,7 @@ end @test @fetch(h()) == (4, 4) @test @fetch(h()) == (5, 5) end -end +end; @testset "chunking mode + chunksize option" begin for sched in (DynamicScheduler, StaticScheduler) @@ -262,6 +306,39 @@ end @test treduce(+, 1:10; scheduler) ≈ reduce(+, 1:10) end end -end +end; + +@testset "top-level kwargs" begin + res_tmr = mapreduce(sin, +, 1:10000) + + # scheduler not given + @test tmapreduce(sin, +, 1:10000; ntasks=2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks=2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; split=:scatter) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize=2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking=false) ≈ res_tmr + + # scheduler isa Scheduler + @test tmapreduce(sin, +, 1:10000; scheduler=StaticScheduler()) ≈ res_tmr + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=DynamicScheduler()) + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=DynamicScheduler()) + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; split=:scatter, scheduler=StaticScheduler()) + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=SerialScheduler()) + + # scheduler isa Symbol + for s in (:dynamic, :static, :serial, :greedy) + @test tmapreduce(sin, +, 1:10000; scheduler=s, init=0.0) ≈ res_tmr + end + for s in (:dynamic, :static, :greedy) + @test tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=s, init=0.0) ≈ res_tmr + end + for s in (:dynamic, :static) + @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking=false, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks=3, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=s) ≈ res_tmr + @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, nchunks=2, scheduler=s) ≈ res_tmr + end +end; # Todo way more testing, and easier tests to deal with