Skip to content

Commit

Permalink
Prepare for ChunkSplitters 3.0 (#119)
Browse files Browse the repository at this point in the history
* prepare for ChunkSplitters 3.0

* update

* support symbol and split

* blub

* check chunks incompatible kwargs

* error when chunks or index_chunks innput + chunking=true

* rename + error on incompatible chunking kwargs

* throw errors for unsupported split symbols

* dont export split types

* Update src/schedulers.jl

Co-authored-by: Mason Protter <[email protected]>

* Update src/schedulers.jl

Co-authored-by: Mason Protter <[email protected]>

* minor improvements

* Update src/schedulers.jl

Co-authored-by: Fredrik Ekre <[email protected]>

* Update src/schedulers.jl

Co-authored-by: Fredrik Ekre <[email protected]>

* Update src/schedulers.jl

Co-authored-by: Fredrik Ekre <[email protected]>

* split -> type parameter

---------

Co-authored-by: Mason Protter <[email protected]>
Co-authored-by: Fredrik Ekre <[email protected]>
  • Loading branch information
3 people authored Sep 26, 2024
1 parent 491dac9 commit 4b91f66
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 109 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
OhMyThreads.jl Changelog
=========================

Version 0.7.0
-------------
- ![BREAKING][badge-breaking] We now use ChunkSplitters version 3.0. The function `OhMyThreads.chunks` has been renamed to `OhMyThreads.index_chunks`. The new functions `index_chunks` and `chunks` (different from the old one with the same name!) are now exported. See ChunkSplitters.jl for more information.
- ![BREAKING][badge-breaking] If you provide a `chunks` or `index_chunks` as input we now disable the internal chunking without a warning. Previously, we did show a warning unless you had set `chunking=false`. In contrast, we now throw an error when you set any incompatible chunking related keyword arguments.
- ![Deprecation][badge-deprecation] The `split` options `:batch` and `:scatter` are now deprecated (they still work but will be dropped at some point). Use `:consecutive` and `:roundrobin`, respectively, instead.
- ![Enhancement][badge-enhancement] The `split` keyword argument can now also be a `<: OhMyThreads.Split`. Compared to providing a `Symbol`, the former can potentially give better performance. For example, you can replace `:consecutive` by `OhMyThreads.Consecutive()` and `:roundrobin` by `OhMyThreads.RoundRobin()`.

Version 0.6.2
-------------
- ![Enhancement][badge-enhancement] Added API support for `enumerate(chunks(...))`. Best used in combination with `chunking=false`.
- ![Enhancement][badge-enhancement] Added API support for `enumerate(chunks(...))`. Best used in combination with `chunking=false`

Version 0.6.1
-------------
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TaskLocalValues = "ed4db957-447d-4319-bfb6-7fa9ae7ecf34"
[compat]
Aqua = "0.8"
BangBang = "0.3.40, 0.4"
ChunkSplitters = "2.4"
ChunkSplitters = "3"
StableTasks = "0.1.5"
TaskLocalValues = "0.1"
Test = "1"
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/falsesharing/falsesharing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ data = rand(1_000_000 * nthreads());
#
# A common, manual implementation of this idea might look like this:

using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function parallel_sum_falsesharing(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
for i in idcs
psums[c] += data[i]
Expand Down Expand Up @@ -102,7 +102,7 @@ nthreads()

function parallel_sum_tasklocal(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
local s = zero(eltype(data))
for i in idcs
Expand Down Expand Up @@ -131,7 +131,7 @@ end
# using `map` and reusing the built-in (sequential) `sum` function on each parallel task:

function parallel_sum_map(data; nchunks = nthreads())
ts = map(chunks(data, n = nchunks)) do idcs
ts = map(index_chunks(data, n = nchunks)) do idcs
@spawn @views sum(data[idcs])
end
return sum(fetch.(ts))
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/falsesharing/falsesharing.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ catastrophic numerical errors due to potential rearrangements of terms in the su
A common, manual implementation of this idea might look like this:

````julia
using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function parallel_sum_falsesharing(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
for i in idcs
psums[c] += data[i]
Expand Down Expand Up @@ -132,7 +132,7 @@ into `psums` (once!).
````julia
function parallel_sum_tasklocal(data; nchunks = nthreads())
psums = zeros(eltype(data), nchunks)
@sync for (c, idcs) in enumerate(chunks(data; n = nchunks))
@sync for (c, idcs) in enumerate(index_chunks(data; n = nchunks))
@spawn begin
local s = zero(eltype(data))
for i in idcs
Expand Down Expand Up @@ -168,7 +168,7 @@ using `map` and reusing the built-in (sequential) `sum` function on each paralle

````julia
function parallel_sum_map(data; nchunks = nthreads())
ts = map(chunks(data, n = nchunks)) do idcs
ts = map(index_chunks(data, n = nchunks)) do idcs
@spawn @views sum(data[idcs])
end
return sum(fetch.(ts))
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/mc/mc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ using OhMyThreads: StaticScheduler

# ## Manual parallelization
#
# First, using the `chunks` function, we divide the iteration interval `1:N` into
# First, using the `index_chunks` function, we divide the iteration interval `1:N` into
# `nthreads()` parts. Then, we apply a regular (sequential) `map` to spawn a Julia task
# per chunk. Each task will locally and independently perform a sequential Monte Carlo
# simulation. Finally, we fetch the results and compute the average estimate for $\pi$.

using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function mc_parallel_manual(N; nchunks = nthreads())
tasks = map(chunks(1:N; n = nchunks)) do idcs
tasks = map(index_chunks(1:N; n = nchunks)) do idcs
@spawn mc(length(idcs))
end
pi = sum(fetch, tasks) / nchunks
Expand All @@ -104,7 +104,7 @@ mc_parallel_manual(N)
# `mc(length(idcs))` is faster than the implicit task-local computation within
# `tmapreduce` (which itself is a `mapreduce`).

idcs = first(chunks(1:N; n = nthreads()))
idcs = first(index_chunks(1:N; n = nthreads()))

@btime mapreduce($+, $idcs) do i
rand()^2 + rand()^2 < 1.0
Expand Down
8 changes: 4 additions & 4 deletions docs/src/literate/mc/mc.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ using OhMyThreads: StaticScheduler

## Manual parallelization

First, using the `chunks` function, we divide the iteration interval `1:N` into
First, using the `index_chunks` function, we divide the iteration interval `1:N` into
`nthreads()` parts. Then, we apply a regular (sequential) `map` to spawn a Julia task
per chunk. Each task will locally and independently perform a sequential Monte Carlo
simulation. Finally, we fetch the results and compute the average estimate for $\pi$.

````julia
using OhMyThreads: @spawn, chunks
using OhMyThreads: @spawn, index_chunks

function mc_parallel_manual(N; nchunks = nthreads())
tasks = map(chunks(1:N; n = nchunks)) do idcs
tasks = map(index_chunks(1:N; n = nchunks)) do idcs
@spawn mc(length(idcs))
end
pi = sum(fetch, tasks) / nchunks
Expand Down Expand Up @@ -151,7 +151,7 @@ It is faster than `mc_parallel` above because the task-local computation
`tmapreduce` (which itself is a `mapreduce`).

````julia
idcs = first(chunks(1:N; n = nthreads()))
idcs = first(index_chunks(1:N; n = nthreads()))

@btime mapreduce($+, $idcs) do i
rand()^2 + rand()^2 < 1.0
Expand Down
4 changes: 2 additions & 2 deletions docs/src/literate/tls/tls.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ res ≈ res_naive
# iterations (i.e. matrix pairs) for which this task is responsible.
# Before we learn how to do this more conveniently, let's implement this idea of a
# task-local temporary buffer (for each parallel task) manually.
using OhMyThreads: chunks, @spawn
using OhMyThreads: index_chunks, @spawn
using Base.Threads: nthreads

function matmulsums_manual(As, Bs)
N = size(first(As), 1)
tasks = map(chunks(As; n = 2 * nthreads())) do idcs
tasks = map(index_chunks(As; n = 2 * nthreads())) do idcs
@spawn begin
local C = Matrix{Float64}(undef, N, N)
map(idcs) do i
Expand Down
4 changes: 2 additions & 2 deletions docs/src/literate/tls/tls.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ Before we learn how to do this more conveniently, let's implement this idea of a
task-local temporary buffer (for each parallel task) manually.

````julia
using OhMyThreads: chunks, @spawn
using OhMyThreads: index_chunks, @spawn
using Base.Threads: nthreads

function matmulsums_manual(As, Bs)
N = size(first(As), 1)
tasks = map(chunks(As; n = 2 * nthreads())) do idcs
tasks = map(index_chunks(As; n = 2 * nthreads())) do idcs
@spawn begin
local C = Matrix{Float64}(undef, N, N)
map(idcs) do i
Expand Down
13 changes: 11 additions & 2 deletions docs/src/refs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,25 @@ GreedyScheduler
SerialScheduler
```

## Non-Exported
## Re-exported

| | |
|------------------------|---------------------------------------------------------------------|
| `OhMyThreads.chunks` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.chunks) |
| `OhMyThreads.index_chunks` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.index_chunks) |

## Public but not exported

| | |
|------------------------|---------------------------------------------------------------------|
| `OhMyThreads.@spawn` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.@spawnat` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.@fetch` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.@fetchfrom` | see [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) |
| `OhMyThreads.chunks` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/dev/references/#ChunkSplitters.chunks) |
| `OhMyThreads.TaskLocalValue` | see [TaskLocalValues.jl](https://github.com/vchuravy/TaskLocalValues.jl) |
| `OhMyThreads.Split` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.Split) |
| `OhMyThreads.Consecutive` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.Consecutive) |
| `OhMyThreads.RoundRobin` | see [ChunkSplitters.jl](https://juliafolds2.github.io/ChunkSplitters.jl/stable/references/#ChunkSplitters.RoundRobin) |


```@docs
Expand Down
5 changes: 5 additions & 0 deletions src/OhMyThreads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ for mac in Symbol.(["@spawn", "@spawnat", "@fetch", "@fetchfrom"])
end

using ChunkSplitters: ChunkSplitters
const index_chunks = ChunkSplitters.index_chunks
const chunks = ChunkSplitters.chunks
const Split = ChunkSplitters.Split
const Consecutive = ChunkSplitters.Consecutive
const RoundRobin = ChunkSplitters.RoundRobin
export chunks, index_chunks

using TaskLocalValues: TaskLocalValues
const TaskLocalValue = TaskLocalValues.TaskLocalValue
Expand Down
Loading

0 comments on commit 4b91f66

Please sign in to comment.