Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full scheduler::Symbol support + Keyword argument forwarding #81

Merged
merged 18 commits into from
Mar 13, 2024
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ OhMyThreads.jl Changelog
Version 0.5.0
-------------

- ![Feature][badge-feature] The parallel functions (e.g. tmapreduce etc.) now support `scheduler::Symbol` besides `scheduler::Scheduler`. To configure the selected scheduler (e.g. set `nchunks` etc.) one may now pass keyword arguments directly into the parallel functions (they will get passed on to the scheduler constructor). Example: `tmapreduce(sin, +, 1:10; chunksize=2, scheduler=:static)`. Analogous support has been added to the macro API: (Most) settings (`@set name = value`) will now be passed on to the parallel functions as keyword arguments (which then forward them to the scheduler constructor). Note that, to avoid ambiguity, we don't support this feature for `scheduler::Scheduler` but only for `scheduler::Symbol`.
- ![Feature][badge-feature] Added a `SerialScheduler` that can be used to turn off any multithreading.
- ![Feature][badge-feature] Added `OhMyThreads.WithTaskLocals` that represents a closure over `TaskLocalValues`, but can have those values materialized as an optimization (using `OhMyThreads.promise_task_local`)
- ![Feature][badge-feature] In the case `nchunks > nthreads()`, the `StaticScheduler` now distributes chunks in a round-robin fashion (instead of either implicitly decreasing `nchunks` to `nthreads()` or throwing an error).
- ![Feature][badge-feature] `@set init = ...` may now be used to specify an initial value for a reduction (only has an effect in conjuction with `@set reducer=...` and triggers a warning otherwise).
- ![Enhancement][badge-enhancement] `SerialScheduler` and `DynamicScheduler` now support the keyword argument `ntasks` as an alias for `nchunks`.
- ![Enhancement][badge-enhancement] Made `@tasks` use `OhMyThreads.WithTaskLocals` automatically as an optimization.
- ![Enhancement][badge-enhancement] Uses of `@local` within `@tasks` no-longer require users to declare the type of the task local value, it can be inferred automatically if a type is not provided.
- ![BREAKING][badge-breaking] The `DynamicScheduler` (default) and the `StaticScheduler` now support a `chunksize` argument to specify the desired size of chunks instead of the number of chunks (`nchunks`). Note that `chunksize` and `nchunks` are mutually exclusive. (This is unlikely to break existing code but technically could because the type parameter has changed from `Bool` to `ChunkingMode`.)
- ![Breaking][badge-breaking] `DynamicScheduler` and `StaticScheduler` don't support `nchunks=0` or `chunksize=0` any longer. Instead, chunking can now be turned off via an explicit new keyword argument `chunking=false`.
- ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed).
- ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped.
- ![BREAKING][badge-breaking] The default value for `ntasks`/`nchunks` for `DynamicScheduler` has been changed from `2*nthreads()` to `nthreads()`. With the new value we now align with `@threads :dynamic`. The old value wasn't giving good load balancing anyways and choosing a higher value penalizes uniform use cases even more. To get the old behavior, set `nchunks=2*nthreads()`.

Version 0.4.6
-------------
Expand Down
31 changes: 17 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,26 @@ focus on [data parallelism](https://en.wikipedia.org/wiki/Data_parallelism), tha
## Example

```julia
using OhMyThreads
using OhMyThreads: tmapreduce, @tasks
using BenchmarkTools: @btime
using Base.Threads: nthreads

# Variant 1: function API
function mc_parallel(N; kw...)
M = tmapreduce(+, 1:N; kw...) do i
function mc_parallel(N; ntasks=nthreads())
M = tmapreduce(+, 1:N; ntasks) do i
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
return pi
end

# Variant 2: macro API
function mc_parallel_macro(N)
function mc_parallel_macro(N; ntasks=nthreads())
M = @tasks for i in 1:N
@set reducer=+
@set begin
reducer=+
ntasks=ntasks
end
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
Expand All @@ -62,19 +67,17 @@ end
N = 100_000_000
mc_parallel(N) # gives, e.g., 3.14159924

using BenchmarkTools

@show Threads.nthreads() # 5 in this example

@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread
@btime mc_parallel($N) # using all 5 threads
@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread)
@btime mc_parallel($N) # using all threads
@btime mc_parallel_macro($N) # using all threads
```

Timings might be something like this:
With 5 threads, timings might be something like this:

```
447.093 ms (7 allocations: 624 bytes)
89.401 ms (66 allocations: 5.72 KiB)
417.282 ms (14 allocations: 912 bytes)
83.578 ms (38 allocations: 3.08 KiB)
83.573 ms (38 allocations: 3.08 KiB)
```

(Check out the full [Parallel Monte Carlo](https://juliafolds2.github.io/OhMyThreads.jl/stable/literate/mc/mc/) example if you like.)
Expand Down
31 changes: 17 additions & 14 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@ to add the package to your Julia environment.
### Basic example

```julia
using OhMyThreads
using OhMyThreads: tmapreduce, @tasks
using BenchmarkTools: @btime
using Base.Threads: nthreads

# Variant 1: function API
function mc_parallel(N; kw...)
M = tmapreduce(+, 1:N; kw...) do i
function mc_parallel(N; ntasks=nthreads())
M = tmapreduce(+, 1:N; ntasks) do i
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
return pi
end

# Variant 2: macro API
function mc_parallel_macro(N)
function mc_parallel_macro(N; ntasks=nthreads())
M = @tasks for i in 1:N
@set reducer=+
@set begin
reducer=+
ntasks=ntasks
end
rand()^2 + rand()^2 < 1.0
end
pi = 4 * M / N
Expand All @@ -38,19 +43,17 @@ end
N = 100_000_000
mc_parallel(N) # gives, e.g., 3.14159924

using BenchmarkTools

@show Threads.nthreads() # 5 in this example

@btime mc_parallel($N; scheduler=DynamicScheduler(; nchunks=1)) # effectively using 1 thread
@btime mc_parallel($N) # using all 5 threads
@btime mc_parallel($N; ntasks=1) # use a single task (and hence a single thread)
@btime mc_parallel($N) # using all threads
@btime mc_parallel_macro($N) # using all threads
```

Timings might be something like this:
With 5 threads, timings might be something like this:

```
447.093 ms (7 allocations: 624 bytes)
89.401 ms (66 allocations: 5.72 KiB)
417.282 ms (14 allocations: 912 bytes)
83.578 ms (38 allocations: 3.08 KiB)
83.573 ms (38 allocations: 3.08 KiB)
```

(Check out the full [Parallel Monte Carlo](@ref) example if you like.)
Expand Down
97 changes: 76 additions & 21 deletions src/functions.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
tmapreduce(f, op, A::AbstractArray...;
[scheduler::Scheduler = DynamicScheduler()],
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])

Expand All @@ -27,15 +27,23 @@ is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form

## Keyword arguments:

- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information.
- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation.
- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers.
- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation.

In addition, `tmapreduce` accepts **all keyword arguments that are supported by the selected
scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example:
```
tmapreduce(√, +, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)
```
However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`**
(but not for `scheduler::Scheduler`).
"""
function tmapreduce end

"""
treducemap(op, f, A::AbstractArray...;
[scheduler::Scheduler = DynamicScheduler()],
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])

Expand All @@ -52,7 +60,7 @@ will get undefined results.
## Example:

```
tmapreduce(√, +, [1, 2, 3, 4, 5])
treducemap(+, √, [1, 2, 3, 4, 5])
```

is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form
Expand All @@ -63,15 +71,23 @@ is the parallelized version of `sum(√, [1, 2, 3, 4, 5])` in the form

## Keyword arguments:

- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information.
- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation.
- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers.
- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation.

In addition, `treducemap` accepts **all keyword arguments that are supported by the selected
scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example:
```
treducemap(+, √, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)
```
However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`**
(but not for `scheduler::Scheduler`).
"""
function treducemap end

"""
treduce(op, A::AbstractArray...;
[scheduler::Scheduler = DynamicScheduler()],
[scheduler::Union{Scheduler, Symbol} = :dynamic],
[outputtype::Type = Any],
[init])

Expand All @@ -97,15 +113,23 @@ is the parallelized version of `sum([1, 2, 3, 4, 5])` in the form

## Keyword arguments:

- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information.
- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
- `init`: forwarded to `mapreduce` for the task-local sequential parts of the calculation.
- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers.
- `outputtype::Type` (default `Any`): will work as the asserted output type of parallel calculations. We use [StableTasks.jl](https://github.com/JuliaFolds2/StableTasks.jl) to make setting this option unnecessary, but if you experience problems with type stability, you may be able to recover it with this keyword argument.
- `init`: initial value of the reduction. Will be forwarded to `mapreduce` for the task-local sequential parts of the calculation.

In addition, `treduce` accepts **all keyword arguments that are supported by the selected
scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example:
```
treduce(+, [1, 2, 3, 4, 5]; chunksize=2, scheduler=:static)
```
However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`**
(but not for `scheduler::Scheduler`).
"""
function treduce end

"""
tforeach(f, A::AbstractArray...;
[schedule::Scheduler = DynamicScheduler()]) :: Nothing
[schedule::Union{Scheduler, Symbol} = :dynamic]) :: Nothing

A multithreaded function like `Base.foreach`. Apply `f` to each element of `A` on
multiple parallel tasks, and return `nothing`. I.e. it is the parallel equivalent of
Expand All @@ -126,13 +150,23 @@ end

## Keyword arguments:

- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information.
- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers.

In addition, `tforeach` accepts **all keyword arguments that are supported by the selected
scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example:
```
tforeach(1:10; chunksize=2, scheduler=:static) do i
println(i^2)
end
```
However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`**
(but not for `scheduler::Scheduler`).
"""
function tforeach end

"""
tmap(f, [OutputElementType], A::AbstractArray...;
[schedule::Scheduler = DynamicScheduler()])
[schedule::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like `Base.map`. Create a new container `similar` to `A` and fills
it in parallel such that the `i`th element is equal to `f(A[i])`.
Expand All @@ -149,26 +183,39 @@ tmap(sin, 1:10)

## Keyword arguments:

- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information.
- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers.

In addition, `tmap` accepts **all keyword arguments that are supported by the selected
scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example:
```
tmap(sin, 1:10; chunksize=2, scheduler=:static)
```
However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`**
(but not for `scheduler::Scheduler`).
"""
function tmap end

"""
tmap!(f, out, A::AbstractArray...;
[schedule::Scheduler = DynamicScheduler()])
[schedule::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like `Base.map!`. In parallel on multiple tasks, this function
assigns each element of `out[i] = f(A[i])` for each index `i` of `A` and `out`.

## Keyword arguments:

- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information.
- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers.

In addition, `tmap!` accepts **all keyword arguments that are supported by the selected
scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor.
However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`**
(but not for `scheduler::Scheduler`).
"""
function tmap! end

"""
tcollect([OutputElementType], gen::Union{AbstractArray, Generator{<:AbstractArray}};
[schedule::Scheduler = DynamicScheduler()])
[schedule::Union{Scheduler, Symbol} = :dynamic])

A multithreaded function like `Base.collect`. Essentially just calls `tmap` on the
generator function and inputs.
Expand All @@ -185,6 +232,14 @@ tcollect(sin(i) for i in 1:10)

## Keyword arguments:

- `scheduler::Scheduler` (default `DynamicScheduler()`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information.
- `scheduler::Union{Scheduler, Symbol}` (default `:dynamic`): determines how the computation is divided into parallel tasks and how these are scheduled. See [`Scheduler`](@ref) for more information on the available schedulers.

In addition, `tcollect` accepts **all keyword arguments that are supported by the selected
scheduler**. They will simply be passed on to the corresponding `Scheduler` constructor. Example:
```
tcollect(sin(i) for i in 1:10; chunksize=2, scheduler=:static)
```
However, to avoid ambiguity, this is currently **only supported for `scheduler::Symbol`**
(but not for `scheduler::Scheduler`).
"""
function tcollect end
Loading
Loading