Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to not provide return type to map/collect #6

Merged
merged 6 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ authors = ["Mason Protter <[email protected]>"]
version = "0.2.0"

[deps]
BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66"
ChunkSplitters = "ae650224-84b6-46f8-82ea-d812ca08434e"
StableTasks = "91464d47-22a1-43fe-8b7f-2d57ee82463f"

[compat]
BangBang = "0.4"
ChunkSplitters = "2.1"
StableTasks = "0.1"
julia = "1.6"
Expand Down
218 changes: 128 additions & 90 deletions README.md

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions readme_generator.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using ThreadsBasics

open("README.md", "w+") do io
println(io, """
# ThreadsBasics

#### This package is in very early development and is not yet registered

This is meant to be a simple, unambitious package that provides basic, user-friendly ways of doing
multithreaded calculations via higher-order functions, with a focus on [data parallelism](https://en.wikipedia.org/wiki/Data_parallelism).

It re-exports the very useful function `chunks` from [ChunkSplitters.jl](https://github.com/m3g/ChunkSplitters.jl), and
provides the following functions:
""")
for sym ∈ (:tmapreduce, :treducemap, :treduce, :tmap, :tmap!, :tforeach, :tcollect)
println(io, "<details><summary> $sym </summary>\n<p>\n")
println(io, Base.Docs.doc(Base.Docs.Binding(ThreadsBasics, sym)))
println(io, "\n</details>\n</p>")
println(io, "\n____________________________\n")
end
end
34 changes: 17 additions & 17 deletions src/ThreadsBasics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte
## Keyword arguments:

- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation.
- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `nchunks::Int` (default `2*nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
Expand All @@ -49,8 +49,8 @@ function tmapreduce end
schedule::Symbol =:dynamic,
outputtype::Type = Any)

Like `tmapreduce` except the order of the `f` and `op` arguments are switched. Perform a reduction over `A`,
applying a single-argument function `f` to each element, and then combining them with the two-argument
Like `tmapreduce` except the order of the `f` and `op` arguments are switched. This is sometimes convenient with `do`-block notation.
Perform a reduction over `A`, applying a single-argument function `f` to each element, and then combining them with the two-argument
function `op`. `op` **must** be an [associative](https://en.wikipedia.org/wiki/Associative_property) function,
in the sense that `op(a, op(b, c)) ≈ op(op(a, b), c)`. If `op` is not (approximately) associative, you will
get undefined results.
Expand All @@ -69,7 +69,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte
## Keyword arguments:

- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation.
- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `nchunks::Int` (default `2*nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling should be preferred since it is more flexible and better at load balancing, and more likely to be type stable. However, `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
Expand Down Expand Up @@ -105,7 +105,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte
## Keyword arguments:

- `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation.
- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `nchunks::Int` (default `2*nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
Expand All @@ -123,25 +123,27 @@ Apply `f` to each element of `A` on multiple parallel tasks, and return `nothing

## Keyword arguments:

- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `nchunks::Int` (default `2*nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
"""
function tforeach end

"""
tmap(f, ::Type{OutputType}, A::AbstractArray;
tmap(f, [OutputType], A::AbstractArray;
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic)

A multithreaded function like `Base.map`. Create a new container `similar` to `A` with element type
`OutputType`, whose `i`th element is equal to `f(A[i])`. This container is filled in parallel on multiple tasks.
A multithreaded function like `Base.map`. Create a new container `similar` to `A` whose `i`th element is
equal to `f(A[i])`. This container is filled in parallel on multiple tasks. The optional argument
`OutputType` will select a specific output type for the returned container, and will generally incur fewer
allocations than the version where `OutputType` is not specified.

## Keyword arguments:

- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `nchunks::Int` (default `2*nthreads())` is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). This argument is ignored if `OutputType` is not specified.
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
"""
function tmap end
Expand All @@ -157,24 +159,22 @@ of `out[i] = f(A[i])` for each index `i` of `A` and `out`.

## Keyword arguments:

- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `nchunks::Int` (default `2*nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter).
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
"""
function tmap! end

"""
tcollect(::Type{OutputType}, gen::Base.Generator{AbstractArray, F};
tcollect(::Type{OutputType}, gen::Base.Generator{<:AbstractArray};
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
schedule::Symbol =:dynamic)

A multithreaded function like `Base.collect`. Essentially just calls `tmap` on the generator function and inputs.

## Keyword arguments:

- `nchunks::Int` (default 2 * nthreads()) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results!
- `nchunks::Int` (default `2*nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead.
- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
"""
function tcollect end
Expand Down
22 changes: 18 additions & 4 deletions src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ using ThreadsBasics: chunks, @spawn
using Base: @propagate_inbounds
using Base.Threads: nthreads, @threads

using BangBang: append!!

function tmapreduce(f, op, A;
nchunks::Int = 2 * nthreads(),
split::Symbol = :batch,
Expand Down Expand Up @@ -48,7 +50,7 @@ end
#-------------------------------------------------------------

function tforeach(f, A::AbstractArray; kwargs...)::Nothing
tmapreduce(f, (l, r)->l, A; init=nothing, outputtype=Nothing, kwargs...)
tmapreduce(f, (l, r)->l, A; kwargs..., init=nothing, outputtype=Nothing)
end

#-------------------------------------------------------------
Expand All @@ -57,10 +59,20 @@ function tmap(f, ::Type{T}, A::AbstractArray; kwargs...) where {T}
tmap!(f, similar(A, T), A; kwargs...)
end

function tmap(f, A; nchunks::Int= 2*nthreads(), kwargs...)
the_chunks = collect(chunks(A; n=nchunks))
# It's vital that we force split=:batch here because we're not doing a commutative operation!
v = tmapreduce(append!!, the_chunks; kwargs..., nchunks, split=:batch) do inds
map(f, @view A[inds])
end
reshape(v, size(A)...)
end

@propagate_inbounds function tmap!(f, out, A::AbstractArray; kwargs...)
@boundscheck eachindex(out) == eachindex(A) ||
error("The indices of the input array must match the indices of the output array.")
tforeach(eachindex(A); kwargs...) do i
# It's vital that we force split=:batch here because we're not doing a commutative operation!
tforeach(eachindex(A); kwargs..., split=:batch) do i
fAi = f(@inbounds A[i])
out[i] = fAi
end
Expand All @@ -69,10 +81,12 @@ end

#-------------------------------------------------------------

function tcollect(::Type{T}, gen::Base.Generator{<:AbstractArray, F}; kwargs...) where {T, F}
function tcollect(::Type{T}, gen::Base.Generator{<:AbstractArray}; kwargs...) where {T}
tmap(gen.f, T, gen.iter; kwargs...)
end
tcollect(::Type{T}, A; kwargs...) where {T} = tmap(identity, T, A; kwargs...)
tcollect(gen::Base.Generator{<:AbstractArray}; kwargs...) = tmap(gen.f, gen.iter; kwargs...)

tcollect(::Type{T}, A; kwargs...) where {T} = tmap(identity, T, A; kwargs...)
tcollect(A; kwargs...) = tmap(identity, A; kwargs...)

end # module Implementation
6 changes: 5 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ using Test, ThreadsBasics

@testset "Basics" begin
for (~, f, op, itr) ∈ [
(isapprox, sin, +, rand(ComplexF64, 100)),
(isapprox, sin, +, rand(ComplexF64, 10, 10)),
(isapprox, cos, max, 1:100000),
(==, round, vcat, randn(1000)),
(==, last, *, [1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"])
Expand All @@ -25,6 +25,10 @@ using Test, ThreadsBasics
@test all(tmap(f, Any, itr; kwargs...) .~ map_f_itr)
@test all(tcollect(Any, (f(x) for x in itr); kwargs...) .~ map_f_itr)
@test all(tcollect(Any, f.(itr); kwargs...) .~ map_f_itr)

@test tmap(f, itr; kwargs...) ~ map_f_itr
@test tcollect((f(x) for x in itr); kwargs...) ~ map_f_itr
@test tcollect(f.(itr); kwargs...) ~ map_f_itr

RT = Core.Compiler.return_type(f, Tuple{eltype(itr)})

Expand Down
Loading