Skip to content

Commit

Permalink
switch to using ChunkSplitters
Browse files Browse the repository at this point in the history
  • Loading branch information
MasonProtter committed Jan 28, 2024
1 parent b178ef2 commit 6763c91
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 150 deletions.
10 changes: 5 additions & 5 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
name = "ThreadsBasics"
uuid = "67456a42-1dca-4109-a031-0a68de7e3ad5"
authors = ["Mason Protter <[email protected]>"]
version = "0.1.0"
version = "0.2.0"

[deps]
SplittablesBase = "171d559e-b47b-412a-8079-5efa626c420e"
ChunkSplitters = "ae650224-84b6-46f8-82ea-d812ca08434e"
StableTasks = "91464d47-22a1-43fe-8b7f-2d57ee82463f"

[compat]
julia = "1.6"
ChunkSplitters = "2"
StableTasks = "0.1"
SplittablesBase = "0.1"
julia = "1.6"

[extras]
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test"]
test = ["Test"]
163 changes: 142 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,145 @@
This is meant to be a simple, unambitious package that provides basic, user-friendly ways of doing
multithreaded calculations via higher-order functions, with a focus on [data parallelism](https://en.wikipedia.org/wiki/Data_parallelism).

It provides

- `tmap(f, ::Type{OutputType}, A::AbstractArray)` which `map`s
the function `f` over the array `A` assuming that the output type of `f` is `OutputType`.
- `tmap!(f, out, A::AbstractArray)` which is like `tmap` except instead of creating an output container of a certain element type, it mutates a provided container `out` such that `out[i] = f(A[i])`, (i.e. a parallelized version of `Base.map!`).
- `tforeach(f, itr)` which is like `Base.foreach` except parallelized over multiple tasks, simply calling the function `f` on each element of `itr`.
- The iterable `itr` can be any type which supports `halve` and `amount` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl).
- `treduce(op, itr; [init])` which is a parallelized version of `Base.reduce`, combining each element of `itr` with a two-argument function `op`. Reduce may seem unfamiliar to some, but the function `sum(A)` is simply `reduce(+, A)`, for example.
- `op` must be [associative](https://en.wikipedia.org/wiki/Associative_property) in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`.
- The reduction is performed in a tree-like manner.
- The iterable `itr` can be any type which supports `halve` and `amount` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl).
- `tmapreduce(f, op, itr)` which is a parallelized version of `Base.mapreduce`, applying a one-argument function `f` to each element of `itr` and combining them with a two-argument function `op`. Mapreduce may seem unfamiliar to some, but the function `sum(f, A)` is simply `mapreduce(f, +, A)`, for example.
- `op` must be [associative](https://en.wikipedia.org/wiki/Associative_property) in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`.
- The reduction is performed in a tree-like manner.
- The iterable `itr` can be any type which supports `halve` and `amount` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl).
- `treducemap(op, f, itr; [init]) = tmapreduce(f, op, itr; [init])` because sometimes this is more convenient for `do`-block notation, depending on the calculation.


Users can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) to all of these functions
- `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)).
- `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`.
It re-exports the very useful function `chunks` from [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl), and
provides the following functions

___________________

tmapreduce(f, op, A::AbstractArray;
[init],
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic,
outputtype::Type = Any)

A multithreaded function like `Base.mapreduce`. Perform a reduction over `A`, applying a single-argument
function `f` to each element, and then combining them with the two-argument function `op`. `op` **must** be an
[associative](https://en.wikipedia.org/wiki/Associative_property) function, in the sense that
`op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will get undefined
results.

For a very well known example of `mapreduce`, `sum(f, A)` is equivalent to `mapreduce(f, +, A)`. Doing

tmapreduce(√, +, [1, 2, 3, 4, 5])

is the parallelized version of

(√1 + √2) + (√3 + √4) + √5

This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl).

## Keyword arguments:

- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation.
- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument.

___________________

treducemap(op, f, A::AbstractArray;
[init],
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic,
outputtype::Type = Any)

Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `A`,
applying a single-argument function `f` to each element, and then combining them with the two-argument
function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function,
in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will
get undefined results.

For a very well known example of `mapreduce`, `sum(f, A)` is equivalent to `mapreduce(f, +, A)`. Doing

treducemap(+, √, [1, 2, 3, 4, 5])

is the parallelized version of

(√1 + √2) + (√3 + √4) + √5


This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl).

## Keyword arguments:

- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation.
- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling should be preferred since it is more flexible and better at load balancing, and more likely to be type stable. However, `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument.

_____________________________________

treduce(op, A::AbstractArray; [init],
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic,
outputtype::Type = Any)

Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `A`,
applying a single-argument function `f` to each element, and then combining them with the two-argument
function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function,
in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will
get undefined results.

For a very well known example of `reduce`, `sum(A)` is equivalent to `reduce(+, A)`. Doing

treduce(+, [1, 2, 3, 4, 5])

is the parallelized version of

(1 + 2) + (3 + 4) + 5


This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl).

## Keyword arguments:

- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation.
- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument.

_______________________________________________


tforeach(f, A::AbstractArray;
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic) :: Nothing

Apply `f` to each element of `A` on multiple parallel tasks, and return `nothing`.

## Keyword arguments:

- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.


__________________________

tmap(f, ::Type{OutputType}, A::AbstractArray;
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic)

A multithreaded function like `Base.map`. Create a new container `similar` to `A` with element type
`OutputType`, whose `i`th element is equal to `f(A[i])`. This container is filled in parallel on multiple tasks.

## Keyword arguments:

- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.

______________________________


Loading

0 comments on commit 6763c91

Please sign in to comment.