From 6763c918774c69f02af0e70f31d92b718c5ee20b Mon Sep 17 00:00:00 2001 From: Mason Protter Date: Sun, 28 Jan 2024 22:26:26 +0100 Subject: [PATCH] switch to using ChunkSplitters --- Project.toml | 10 +-- README.md | 163 +++++++++++++++++++++++++++++----- src/ThreadsBasics.jl | 205 +++++++++++++++++++------------------------ test/runtests.jl | 36 ++++++-- 4 files changed, 264 insertions(+), 150 deletions(-) diff --git a/Project.toml b/Project.toml index 98fb6999..115828ad 100644 --- a/Project.toml +++ b/Project.toml @@ -1,19 +1,19 @@ name = "ThreadsBasics" uuid = "67456a42-1dca-4109-a031-0a68de7e3ad5" authors = ["Mason Protter "] -version = "0.1.0" +version = "0.2.0" [deps] -SplittablesBase = "171d559e-b47b-412a-8079-5efa626c420e" +ChunkSplitters = "ae650224-84b6-46f8-82ea-d812ca08434e" StableTasks = "91464d47-22a1-43fe-8b7f-2d57ee82463f" [compat] -julia = "1.6" +ChunkSplitters = "2" StableTasks = "0.1" -SplittablesBase = "0.1" +julia = "1.6" [extras] Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test"] \ No newline at end of file +test = ["Test"] diff --git a/README.md b/README.md index 02a0c425..5d1d5236 100644 --- a/README.md +++ b/README.md @@ -5,24 +5,145 @@ This is meant to be a simple, unambitious package that provides basic, user-friendly ways of doing multithreaded calculations via higher-order functions, with a focus on [data parallelism](https://en.wikipedia.org/wiki/Data_parallelism). -It provides - -- `tmap(f, ::Type{OutputType}, A::AbstractArray)` which `map`s -the function `f` over the array `A` assuming that the output type of `f` is `OutputType`. -- `tmap!(f, out, A::AbstractArray)` which is like `tmap` except instead of creating an output container of a certain element type, it mutates a provided container `out` such that `out[i] = f(A[i])`, (i.e. a parallelized version of `Base.map!`). -- `tforeach(f, itr)` which is like `Base.foreach` except parallelized over multiple tasks, simply calling the function `f` on each element of `itr`. - - The iterable `itr` can be any type which supports `halve` and `amount` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl). -- `treduce(op, itr; [init])` which is a parallelized version of `Base.reduce`, combining each element of `itr` with a two-argument function `op`. Reduce may seem unfamiliar to some, but the function `sum(A)` is simply `reduce(+, A)`, for example. - - `op` must be [associative](https://en.wikipedia.org/wiki/Associative_property) in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. - - The reduction is performed in a tree-like manner. - - The iterable `itr` can be any type which supports `halve` and `amount` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl). -- `tmapreduce(f, op, itr)` which is a parallelized version of `Base.mapreduce`, applying a one-argument function `f` to each element of `itr` and combining them with a two-argument function `op`. Mapreduce may seem unfamiliar to some, but the function `sum(f, A)` is simply `mapreduce(f, +, A)`, for example. - - `op` must be [associative](https://en.wikipedia.org/wiki/Associative_property) in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. - - The reduction is performed in a tree-like manner. - - The iterable `itr` can be any type which supports `halve` and `amount` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl). -- `treducemap(op, f, itr; [init]) = tmapreduce(f, op, itr; [init])` because sometimes this is more convenient for `do`-block notation, depending on the calculation. - - -Users can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) to all of these functions -- `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). -- `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`. +It re-exports the very useful function `chunks` from [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl), and +provides the following functions + +___________________ + + tmapreduce(f, op, A::AbstractArray; + [init], + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic, + outputtype::Type = Any) + +A multithreaded function like `Base.mapreduce`. Perform a reduction over `A`, applying a single-argument +function `f` to each element, and then combining them with the two-argument function `op`. `op` **must** be an +[associative](https://en.wikipedia.org/wiki/Associative_property) function, in the sense that +`op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will get undefined +results. + +For a very well known example of `mapreduce`, `sum(f, A)` is equivalent to `mapreduce(f, +, A)`. Doing + + tmapreduce(√, +, [1, 2, 3, 4, 5]) + +is the parallelized version of + + (√1 + √2) + (√3 + √4) + √5 + +This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl). + +## Keyword arguments: + +- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only +needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. + +___________________ + + treducemap(op, f, A::AbstractArray; + [init], + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic, + outputtype::Type = Any) + +Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `A`, +applying a single-argument function `f` to each element, and then combining them with the two-argument +function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function, +in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will +get undefined results. + +For a very well known example of `mapreduce`, `sum(f, A)` is equivalent to `mapreduce(f, +, A)`. Doing + + treducemap(+, √, [1, 2, 3, 4, 5]) + +is the parallelized version of + + (√1 + √2) + (√3 + √4) + √5 + + +This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl). + +## Keyword arguments: + +- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling should be preferred since it is more flexible and better at load balancing, and more likely to be type stable. However, `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only +needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. + +_____________________________________ + + treduce(op, A::AbstractArray; [init], + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic, + outputtype::Type = Any) + +Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `A`, +applying a single-argument function `f` to each element, and then combining them with the two-argument +function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function, +in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will +get undefined results. + +For a very well known example of `reduce`, `sum(A)` is equivalent to `reduce(+, A)`. Doing + + treduce(+, [1, 2, 3, 4, 5]) + +is the parallelized version of + + (1 + 2) + (3 + 4) + 5 + + +This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl). + +## Keyword arguments: + +- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only +needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. + +_______________________________________________ + + + tforeach(f, A::AbstractArray; + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic) :: Nothing + +Apply `f` to each element of `A` on multiple parallel tasks, and return `nothing`. + +## Keyword arguments: + +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + + +__________________________ + + tmap(f, ::Type{OutputType}, A::AbstractArray; + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic) + +A multithreaded function like `Base.map`. Create a new container `similar` to `A` with element type +`OutputType`, whose `i`th element is equal to `f(A[i])`. This container is filled in parallel on multiple tasks. + +## Keyword arguments: + +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + +______________________________ + + diff --git a/src/ThreadsBasics.jl b/src/ThreadsBasics.jl index aff8d703..55a9a1f4 100644 --- a/src/ThreadsBasics.jl +++ b/src/ThreadsBasics.jl @@ -1,61 +1,61 @@ module ThreadsBasics -using Base: @propagate_inbounds -using Base.Threads: nthreads -using StableTasks: @spawn -using SplittablesBase: amount, halve -export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach +using StableTasks: @spawn +using ChunkSplitters: chunks -struct NoInit end +export chunks, treduce, tmapreduce, treducemap, tmap, tmap!, tforeach """ - tmapreduce(f, op, itr; [init] - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) - -A multithreaded function like `Base.mapreduce`. Perform a reduction over `itr`, applying a single-argument + tmapreduce(f, op, A::AbstractArray; + [init], + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic, + outputtype::Type = Any) + +A multithreaded function like `Base.mapreduce`. Perform a reduction over `A`, applying a single-argument function `f` to each element, and then combining them with the two-argument function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function, in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will get undefined -results. +results. -For a very well known example of `mapreduce`, `sum(f, itr)` is equivalent to `mapreduce(f, +, itr)`. Doing +For a very well known example of `mapreduce`, `sum(f, A)` is equivalent to `mapreduce(f, +, A)`. Doing - treducemap(+, √, [1, 2, 3, 4, 5]) + tmapreduce(√, +, [1, 2, 3, 4, 5]) is the parallelized version of (√1 + √2) + (√3 + √4) + √5 -This reduction is tree-based. +This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl). ## Keyword arguments: -- Uses can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) - - `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). - - `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`. The chunks are then operated on in sequential with `mapreduce(f, op, chunk; [init])`, and then the chunks are combined using `op`. -- `init` optional keyword argument forwarded to `reduce` for the sequential parts of the calculation. +- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only +needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. """ -function tmapreduce(f, op, itr; - init=NoInit(), - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) - _tmapreduce(f, op, itr; init, chunk_size) -end +function tmapreduce end """ - treducemap(op, f, itr; [init] - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) - -Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `itr`, + treducemap(op, f, A::AbstractArray; + [init], + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic, + outputtype::Type = Any) + +Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `A`, applying a single-argument function `f` to each element, and then combining them with the two-argument function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function, in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will get undefined results. -For a very well known example of `mapreduce`, `sum(f, itr)` is equivalent to `mapreduce(f, +, itr)`. Doing +For a very well known example of `mapreduce`, `sum(f, A)` is equivalent to `mapreduce(f, +, A)`. Doing treducemap(+, √, [1, 2, 3, 4, 5]) @@ -63,43 +63,35 @@ is the parallelized version of (√1 + √2) + (√3 + √4) + √5 -This reduction is tree-based. + +This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl). ## Keyword arguments: -- Uses can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) - - `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). - - `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`. The chunks are then operated on sequentially with `mapreduce(f, op, chunk; [init])`, and then the chunks are combined using `op`. - `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling should be preferred since it is more flexible and better at load balancing, and more likely to be type stable. However, `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only +needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. """ -treducemap(op, f, itr; kwargs...) = tmapreduce(f, op, itr; kwargs...) - -function _tmapreduce(f, op, itr; chunk_size::Int, init) - if amount(itr) <= chunk_size - kwargs = init === NoInit() ? (;) : (; init) - mapreduce(f, op, itr; kwargs...) - else - l, r = halve(itr) - # @show l, r - # error() - task_r = @spawn _tmapreduce(f, op, r; chunk_size, init) - result_l = _tmapreduce(f, op, l; chunk_size, init) - op(result_l, fetch(task_r)) - end -end +function treducemap end + """ - treduce(op, itr; [init] - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) + treduce(op, A::AbstractArray; [init], + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic, + outputtype::Type = Any) -A multithreaded parallel function like `Base.reduce`. Perform a reduction over `itr`, then combining each -element with the two-argument function `op`. `op` **must** be an -[associative](https://en.wikipedia.org/wiki/Associative_property) function, in the sense that -`op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will get undefined -results. +Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `A`, +applying a single-argument function `f` to each element, and then combining them with the two-argument +function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function, +in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will +get undefined results. -For a very well known example of `reduce`, `sum(itr)` is equivalent to `reduce(+, itr)`. Doing +For a very well known example of `reduce`, `sum(A)` is equivalent to `reduce(+, A)`. Doing treduce(+, [1, 2, 3, 4, 5]) @@ -107,89 +99,72 @@ is the parallelized version of (1 + 2) + (3 + 4) + 5 -This reduction is tree-based. + +This data is divided into chunks to be worked on in parallel using [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl). ## Keyword arguments: -- Uses can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) - - `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). - - `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`. The chunks are then operated on sequentially with `reduce(op, chunk; [init])`, and then the chunks are combined using `op`. -- `init` optional keyword argument forwarded to `reduce` for the sequential parts of the calculation. +- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only +needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. """ -function treduce(op, itr; - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) - _tmapreduce(identity, op, itr; chunk_size, init) -end - +function treduce end """ - tforeach(f, itr; chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) + tforeach(f, A::AbstractArray; + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic) :: Nothing -Apply `f` to each element of `itr` on multiple parallel tasks, with each thread being given a number of chunks -from `itr` to work on approximately equal to `chunks_per_thread`. Instead of providing `chunks_per_thread`, -users can provide `chunk_size` directly, which means that the data will be split into chunks of at most -`chunk_size` in length. +Apply `f` to each element of `A` on multiple parallel tasks, and return `nothing`. ## Keyword arguments: -- Uses can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) - - `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). - - `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`. The chunks are then operated on in sequential with `reduce(op, chunk; [init])`, and then the chunks are combined using `op`. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. """ -function tforeach(f, itr; - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads())))::Nothing - tmapreduce(f, (l, r)->l, itr; init=nothing) -end +function tforeach end """ - tmap(f, ::Type{T}, A::AbstractArray; - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads())) - -A multithreaded function like `Base.map`. Create a new container `similar` to `A` with eltype `T`, whose `i`th -element is equal to `f(A[i])`. This container is filled in parallel on multiple tasks. + tmap(f, ::Type{OutputType}, A::AbstractArray; + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic) +A multithreaded function like `Base.map`. Create a new container `similar` to `A` with element type +`OutputType`, whose `i`th element is equal to `f(A[i])`. This container is filled in parallel on multiple tasks. ## Keyword arguments: -- Uses can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) - - `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). - - `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`. The chunks are then operated on sequentially. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. """ -function tmap(f, ::Type{T}, A::AbstractArray; - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) where {T} - tmap!(f, similar(A, T), A; chunk_size) -end - +function tmap end """ tmap!(f, out, A::AbstractArray; - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads())) + nchunks::Int = 2 * nthreads(), + split::Symbol = :batch, + schedule::Symbol =:dynamic) A multithreaded function like `Base.map!`. In parallel on multiple tasks, this function assigns each element of `out[i] = f(A[i])` for each index `i` of `A` and `out`. ## Keyword arguments: -- Uses can provide *either* `chunk_size`, or `chunks_per_thread` (and if both are provided, `chunk_size` is used) - - `chunks_per_thread` (defaults `2`), will try to split up `itr` so that each thread will recieve *approximately* `chunks_per_thread` pieces of data to work on. More `chunks_per_thread`, typically means better [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). - - `chunk_size` (computed based on `chunks_per_thread` by deault). Data from `itr` will be divided in half using `halve` from [SplittablesBase.jl](https://github.com/JuliaFolds2/SplittablesBase.jl) until those chunks have an `SplittablesBase.amount` less than or equal to `chunk_size`. The chunks are then operated on sequentially. +- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. +- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! +- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. """ -@propagate_inbounds function tmap!(f, out, A::AbstractArray; - chunks_per_thread::Int = 2, - chunk_size = max(1, amount(itr) ÷ (chunks_per_thread * nthreads()))) - @boundscheck eachindex(out) == eachindex(A) || - error("The indices of the input array must match the indices of the output array.") - tforeach(eachindex(A)) do i - fAi = f(@inbounds A[i]) - @inbounds out[i] = fAi - end - out -end +function tmap! end + + +include("implementation.jl") + end # module ThreadsBasics diff --git a/test/runtests.jl b/test/runtests.jl index 34d8fe79..f0e459c1 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,16 +1,34 @@ using Test, ThreadsBasics @testset "Basics" begin - for (f, op, itr) ∈ [(sin, *, rand(ComplexF64, 100)), - (cos, +, (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)), - ] + for (~, f, op, itr) ∈ [ + (isapprox, sin, +, rand(ComplexF64, 100)), + (isapprox, cos, max, 1:100000), + (==, round, vcat, randn(1000)), + (==, last, *, [1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"]) + ] + @testset for schedule ∈ (:static, :dynamic,) + @testset for split ∈ (:batch, :scatter) + if split == :scatter # scatter only works for commutative operators + if op ∈ (vcat, *) + continue + end + end + for nchunks ∈ (1, 2, 6, 10, 100) + kwargs = (; schedule, split, nchunks) + mapreduce_f_op_itr = mapreduce(f, op, itr) + @test tmapreduce(f, op, itr; kwargs...) ~ mapreduce_f_op_itr + @test treducemap(op, f, itr; kwargs...) ~ mapreduce_f_op_itr + @test treduce(op, f.(itr); kwargs...) ~ mapreduce_f_op_itr - @test tmapreduce(f, op, itr) ≈ mapreduce(f, op, itr) ≈ treducemap(op, f, itr) - @test treduce(op, itr) ≈ reduce(op, itr) - @test tmap(f, ComplexF64, collect(itr)) ≈ map(f, itr) + map_f_itr = map(f, itr) + @test all(tmap(f, Any, itr; kwargs...) .~ map_f_itr) + RT = Core.Compiler.return_type(f, Tuple{eltype(itr)}) + @test tmap(f, RT, itr; kwargs...) ~ map_f_itr + end + end + end end - # https://github.com/JuliaFolds2/SplittablesBase.jl/issues/1 - @test_broken tmapreduce(last, join, Dict(:a =>"one", :b=>"two", :c=>"three", :d=>"four", :e=>"five")) end -# Todo way more testing +# Todo way more testing, and easier tests to deal with