Skip to content

Commit

Permalink
Full scheduler::Symbol support + Keyword argument forwarding (#81)
Browse files Browse the repository at this point in the history
* tmp

* tmp generic

* tests passing

* tests

* macro API update

* fix init bug

* fixes + docstrings

* changelog

* dont allow ntasks and nchunks at the same time

* Nothing -> NotGiven in scheduler.jl

* try to fix CI (despite bug)

* use init to fix CI

* readme/index.md example

* default dynamic scheduler to ntasks=nthreads

* doc/examples update

* update tls docs

* collapse docs
  • Loading branch information
carstenbauer authored Mar 13, 2024
1 parent 2bd4772 commit 7e9ab31
Show file tree
Hide file tree
Showing 19 changed files with 511 additions and 330 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ OhMyThreads.jl Changelog
Version 0.5.0
-------------

- ![Feature][badge-feature] The parallel functions (e.g. tmapreduce etc.) now support `scheduler::Symbol` besides `scheduler::Scheduler`. To configure the selected scheduler (e.g. set `nchunks` etc.) one may now pass keyword arguments directly into the parallel functions (they will get passed on to the scheduler constructor). Example: `tmapreduce(sin, +, 1:10; chunksize=2, scheduler=:static)`. Analogous support has been added to the macro API: (Most) settings (`@set name = value`) will now be passed on to the parallel functions as keyword arguments (which then forward them to the scheduler constructor). Note that, to avoid ambiguity, we don't support this feature for `scheduler::Scheduler` but only for `scheduler::Symbol`.
- ![Feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading.
- ![Feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`)
- ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error).
- ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise).
- ![Enhancement][badge-enhancement] `SerialScheduler` and `DynamicScheduler` now support the keyword argument `ntasks` as an alias for `nchunks`.
- ![Enhancement][badge-enhancement] Made `@tasks` use `OhMyThreads.WithTaskLocals` automatically as an optimization.
- ![Enhancement][badge-enhancement] Uses of `@local` within `@tasks` no-longer require users to declare the type of the task local value, it can be inferred automatically if a type is not provided.
- ![BREAKING][badge-breaking] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. (This is unlikely to break existing code but technically could because the type parameter has changed from `Bool` to `ChunkingMode`.)
- ![Breaking][badge-breaking] `DynamicScheduler` and `StaticScheduler` don't support `nchunks=0` or `chunksize=0` any longer. Instead, chunking can now be turned off via an explicit new keyword argument `chunking=false`.
- ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed).
- ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped.
- ![BREAKING][badge-breaking] The default value for `ntasks`/`nchunks` for `DynamicScheduler` has been changed from `2*nthreads()` to `nthreads()`. With the new value we now align with `@threads :dynamic`. The old value wasn't giving good load balancing anyways and choosing a higher value penalizes uniform use cases even more. To get the old behavior, set `nchunks=2*nthreads()`.

Version 0.4.6
-------------
Expand Down
31 changes: 17 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,26 @@ focus on [data parallelism](https://en.wikipedia.org/wiki/Data_parallelism), tha
## Example

```julia
using OhMyThreads
using OhMyThreads: tmapreduce, @tasks
using BenchmarkTools: @btime
using Base.Threads: nthreads

# Variant 1: function API
function mc_parallel(N; kw...)
M = tmapreduce(+, 1:N; kw...) do i
function mc_parallel(N; ntasks=nthreads())
M = tmapreduce(+, 1:N; ntasks) do i
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
return pi
end

# Variant 2: macro API
function mc_parallel_macro(N)
function mc_parallel_macro(N; ntasks=nthreads())
M = @tasks for i in 1:N
@set reducer=+
@set begin
reducer=+
ntasks=ntasks
end
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
Expand All @@ -62,19 +67,17 @@ end
N = 100_000_000
mc_parallel(N) # gives, e.g., 3.14159924

using BenchmarkTools

@show Threads.nthreads() # 5 in this example

@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread
@btime mc_parallel($N) # using all 5 threads
@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread)
@btime mc_parallel($N) # using all threads
@btime mc_parallel_macro($N) # using all threads
```

Timings might be something like this:
With 5 threads, timings might be something like this:

```
447.093 ms (7 allocations: 624 bytes)
89.401 ms (66 allocations: 5.72 KiB)
417.282 ms (14 allocations: 912 bytes)
83.578 ms (38 allocations: 3.08 KiB)
83.573 ms (38 allocations: 3.08 KiB)
```

(Check out the full [Parallel Monte Carlo](https://juliafolds2.github.io/OhMyThreads.jl/stable/literate/mc/mc/) example if you like.)
Expand Down
2 changes: 1 addition & 1 deletion docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ makedocs(;
]
],
repo = "https://github.com/JuliaFolds2/OhMyThreads.jl/blob/{commit}{path}#{line}",
format = Documenter.HTML(repolink = "https://github.com/JuliaFolds2/OhMyThreads.jl"))
format = Documenter.HTML(repolink = "https://github.com/JuliaFolds2/OhMyThreads.jl"; collapselevel = 1))

if ci
@info "Deploying documentation to GitHub"
Expand Down
31 changes: 17 additions & 14 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@ to add the package to your Julia environment.
### Basic example

```julia
using OhMyThreads
using OhMyThreads: tmapreduce, @tasks
using BenchmarkTools: @btime
using Base.Threads: nthreads

# Variant 1: function API
function mc_parallel(N; kw...)
M = tmapreduce(+, 1:N; kw...) do i
function mc_parallel(N; ntasks=nthreads())
M = tmapreduce(+, 1:N; ntasks) do i
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
return pi
end

# Variant 2: macro API
function mc_parallel_macro(N)
function mc_parallel_macro(N; ntasks=nthreads())
M = @tasks for i in 1:N
@set reducer=+
@set begin
reducer=+
ntasks=ntasks
end
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
Expand All @@ -38,19 +43,17 @@ end
N = 100_000_000
mc_parallel(N) # gives, e.g., 3.14159924

using BenchmarkTools

@show Threads.nthreads() # 5 in this example

@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread
@btime mc_parallel($N) # using all 5 threads
@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread)
@btime mc_parallel($N) # using all threads
@btime mc_parallel_macro($N) # using all threads
```

Timings might be something like this:
With 5 threads, timings might be something like this:

```
447.093 ms (7 allocations: 624 bytes)
89.401 ms (66 allocations: 5.72 KiB)
417.282 ms (14 allocations: 912 bytes)
83.578 ms (38 allocations: 3.08 KiB)
83.573 ms (38 allocations: 3.08 KiB)
```

(Check out the full [Parallel Monte Carlo](@ref) example if you like.)
Expand Down
3 changes: 2 additions & 1 deletion docs/src/literate/integration/integration.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ end
# interval, as a multiple of the number of available Julia threads.

using Base.Threads: nthreads
@show nthreads()

N = nthreads() * 1_000_000

Expand Down Expand Up @@ -82,3 +81,5 @@ using BenchmarkTools

# Because the problem is trivially parallel - all threads to the same thing and don't need
# to communicate - we expect an ideal speedup of (close to) the number of available threads.

nthreads()
19 changes: 15 additions & 4 deletions docs/src/literate/integration/integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ interval, as a multiple of the number of available Julia threads.

````julia
using Base.Threads: nthreads
@show nthreads()

N = nthreads() * 1_000_000
````

````
5000000
10000000
````

Calling `trapezoidal` we do indeed find the (approximate) value of $\pi$.
Expand Down Expand Up @@ -101,6 +100,10 @@ end
# end
````

````
trapezoidal_parallel (generic function with 1 method)
````

First, we check the correctness of our parallel implementation.

````julia
Expand All @@ -120,14 +123,22 @@ using BenchmarkTools
````

````
12.782 ms (0 allocations: 0 bytes)
2.563 ms (37 allocations: 3.16 KiB)
24.348 ms (0 allocations: 0 bytes)
2.457 ms (69 allocations: 6.05 KiB)
````

Because the problem is trivially parallel - all threads to the same thing and don't need
to communicate - we expect an ideal speedup of (close to) the number of available threads.

````julia
nthreads()
````

````
10
````

---

*This page was generated using [Literate.jl](https://github.com/fredrikekre/Literate.jl).*
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/juliaset/juliaset.jl
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ img = zeros(Int, N, N)
# the load balancing of the default dynamic scheduler. The latter divides the overall
# workload into tasks that can then be dynamically distributed among threads to adjust the
# per-thread load. We can try to fine tune and improve the load balancing further by
# increasing the `nchunks` parameter of the scheduler, that is, creating more and smaller
# tasks.
# increasing the `ntasks` parameter of the scheduler, that is, creating more tasks with
# smaller per-task workload.

using OhMyThreads: DynamicScheduler

@btime compute_juliaset_parallel!($img; scheduler=DynamicScheduler(; nchunks=N)) samples=10 evals=3;
@btime compute_juliaset_parallel!($img; ntasks=N, scheduler=:dynamic) samples=10 evals=3;

# Note that while this turns out to be a bit faster, it comes at the expense of much more
# allocations.
Expand All @@ -126,4 +126,4 @@ using OhMyThreads: DynamicScheduler

using OhMyThreads: StaticScheduler

@btime compute_juliaset_parallel!($img; scheduler=StaticScheduler()) samples=10 evals=3;
@btime compute_juliaset_parallel!($img; scheduler=:static) samples=10 evals=3;
18 changes: 9 additions & 9 deletions docs/src/literate/juliaset/juliaset.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ img = zeros(Int, N, N)
````

````
nthreads() = 5
138.157 ms (0 allocations: 0 bytes)
40.373 ms (67 allocations: 6.20 KiB)
nthreads() = 10
131.295 ms (0 allocations: 0 bytes)
31.422 ms (68 allocations: 6.09 KiB)
````

Expand All @@ -135,17 +135,17 @@ As stated above, the per-pixel computation is non-uniform. Hence, we do benefit
the load balancing of the default dynamic scheduler. The latter divides the overall
workload into tasks that can then be dynamically distributed among threads to adjust the
per-thread load. We can try to fine tune and improve the load balancing further by
increasing the `nchunks` parameter of the scheduler, that is, creating more and smaller
tasks.
increasing the `ntasks` parameter of the scheduler, that is, creating more tasks with
smaller per-task workload.

````julia
using OhMyThreads: DynamicScheduler

@btime compute_juliaset_parallel!($img; scheduler=DynamicScheduler(; nchunks=N)) samples=10 evals=3;
@btime compute_juliaset_parallel!($img; ntasks=N, scheduler=:dynamic) samples=10 evals=3;
````

````
31.751 ms (12011 allocations: 1.14 MiB)
17.438 ms (12018 allocations: 1.11 MiB)
````

Expand All @@ -158,11 +158,11 @@ To quantify the impact of load balancing we can opt out of dynamic scheduling an
````julia
using OhMyThreads: StaticScheduler

@btime compute_juliaset_parallel!($img; scheduler=StaticScheduler()) samples=10 evals=3;
@btime compute_juliaset_parallel!($img; scheduler=:static) samples=10 evals=3;
````

````
63.147 ms (37 allocations: 3.26 KiB)
30.097 ms (73 allocations: 6.23 KiB)
````

Expand Down
4 changes: 2 additions & 2 deletions docs/src/literate/mc/mc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ using Base.Threads: nthreads

using OhMyThreads: StaticScheduler

@btime mc_parallel($N) samples=10 evals=3;
@btime mc_parallel($N; scheduler = StaticScheduler()) samples=10 evals=3;
@btime mc_parallel($N; scheduler=:dynamic) samples=10 evals=3; # default
@btime mc_parallel($N; scheduler=:static) samples=10 evals=3;

# ## Manual parallelization
#
Expand Down
28 changes: 14 additions & 14 deletions docs/src/literate/mc/mc.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mc(N)
````

````
3.14145748
3.14171236
````

## Parallelization with `tmapreduce`
Expand Down Expand Up @@ -69,7 +69,7 @@ mc_parallel(N)
````

````
3.14134792
3.14156496
````

Let's run a quick benchmark.
Expand All @@ -86,9 +86,9 @@ using Base.Threads: nthreads
````

````
nthreads() = 5
317.745 ms (0 allocations: 0 bytes)
88.384 ms (66 allocations: 5.72 KiB)
nthreads() = 10
301.636 ms (0 allocations: 0 bytes)
41.864 ms (68 allocations: 5.81 KiB)
````

Expand All @@ -100,13 +100,13 @@ and compare the performance of static and dynamic scheduling (with default param
````julia
using OhMyThreads: StaticScheduler

@btime mc_parallel($N) samples=10 evals=3;
@btime mc_parallel($N; scheduler=StaticScheduler()) samples=10 evals=3;
@btime mc_parallel($N; scheduler=:dynamic) samples=10 evals=3; # default
@btime mc_parallel($N; scheduler=:static) samples=10 evals=3;
````

````
88.222 ms (66 allocations: 5.72 KiB)
88.203 ms (36 allocations: 2.98 KiB)
41.839 ms (68 allocations: 5.81 KiB)
41.838 ms (68 allocations: 5.81 KiB)
````

Expand All @@ -121,7 +121,7 @@ simulation. Finally, we fetch the results and compute the average estimate for $
using OhMyThreads: @spawn, chunks

function mc_parallel_manual(N; nchunks = nthreads())
tasks = map(chunks(1:N; n = nchunks)) do idcs # TODO: replace by `tmap` once ready
tasks = map(chunks(1:N; n = nchunks)) do idcs
@spawn mc(length(idcs))
end
pi = sum(fetch, tasks) / nchunks
Expand All @@ -132,7 +132,7 @@ mc_parallel_manual(N)
````

````
3.1414609999999996
3.14180504
````

And this is the performance:
Expand All @@ -142,7 +142,7 @@ And this is the performance:
````

````
64.042 ms (31 allocations: 2.80 KiB)
30.224 ms (65 allocations: 5.70 KiB)
````

Expand All @@ -161,8 +161,8 @@ end samples=10 evals=3;
````

````
88.041 ms (0 allocations: 0 bytes)
63.427 ms (0 allocations: 0 bytes)
41.750 ms (0 allocations: 0 bytes)
30.148 ms (0 allocations: 0 bytes)
````

Expand Down
Loading

0 comments on commit 7e9ab31

Please sign in to comment.