diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 79ccc684..d04c5626 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,9 +10,8 @@ jobs: fail-fast: false matrix: version: - - '1.6' - '1.9' - - '1.10.0' + - '1.10' - 'nightly' os: - ubuntu-latest @@ -41,4 +40,4 @@ jobs: with: file: lcov.info env: - JULIA_NUM_THREADS: 4 + JULIA_NUM_THREADS: 4,2 diff --git a/Project.toml b/Project.toml index d19d0d1e..ed89a9d4 100644 --- a/Project.toml +++ b/Project.toml @@ -11,7 +11,7 @@ StableTasks = "91464d47-22a1-43fe-8b7f-2d57ee82463f" [compat] BangBang = "0.4" ChunkSplitters = "2.1" -StableTasks = "0.1.2" +StableTasks = "0.1.4" julia = "1.6" [extras] diff --git a/README.md b/README.md index b888aa8d..6b732dfe 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ OhMyThreads.jl re-exports the very useful function `chunks` from
``` -tmapreduce(f, op, A::AbstractArray; +tmapreduce(f, op, A::AbstractArray...; [init], nchunks::Int = nthreads(), split::Symbol = :batch, @@ -48,7 +48,11 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte * `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. * `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. * `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! - * `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + + * `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + * `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. * `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. @@ -63,7 +67,7 @@ ____________________________
``` -treducemap(op, f, A::AbstractArray; +treducemap(op, f, A::AbstractArray...; [init], nchunks::Int = nthreads(), split::Symbol = :batch, @@ -92,7 +96,11 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte * `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. * `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. * `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! - * `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling should be preferred since it is more flexible and better at load balancing, and more likely to be type stable. However, `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + + * `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + * `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. * `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. @@ -107,7 +115,8 @@ ____________________________
``` -treduce(op, A::AbstractArray; [init], +treduce(op, A::AbstractArray...; + [init], nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic, @@ -135,7 +144,11 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte * `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. * `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. * `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! - * `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + + * `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + * `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. * `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. @@ -150,7 +163,7 @@ ____________________________
``` -tmap(f, [OutputElementType], A::AbstractArray; +tmap(f, [OutputElementType], A::AbstractArray...; nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic) @@ -162,7 +175,11 @@ A multithreaded function like `Base.map`. Create a new container `similar` to `A * `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. * `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! - * `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + + * `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + * `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. @@ -174,7 +191,7 @@ ____________________________
``` -tmap!(f, out, A::AbstractArray; +tmap!(f, out, A::AbstractArray...; nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic) @@ -186,7 +203,11 @@ A multithreaded function like `Base.map!`. In parallel on multiple tasks, this f * `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. * `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! - * `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + + * `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + * `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. @@ -198,7 +219,7 @@ ____________________________
``` -tforeach(f, A::AbstractArray; +tforeach(f, A::AbstractArray...; nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic) :: Nothing @@ -216,7 +237,11 @@ end * `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. * `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! - * `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + + * `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + * `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. @@ -238,7 +263,11 @@ A multithreaded function like `Base.collect`. Essentially just calls `tmap` on t ## Keyword arguments: * `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. - * `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + + * `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + * `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + * `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. diff --git a/src/OhMyThreads.jl b/src/OhMyThreads.jl index 166070a5..7db5788b 100644 --- a/src/OhMyThreads.jl +++ b/src/OhMyThreads.jl @@ -7,7 +7,7 @@ using ChunkSplitters: chunks export chunks, treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect """ - tmapreduce(f, op, A::AbstractArray; + tmapreduce(f, op, A::AbstractArray...; [init], nchunks::Int = nthreads(), split::Symbol = :batch, @@ -35,14 +35,17 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte - `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. - `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. - `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! -- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. - `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. """ function tmapreduce end """ - treducemap(op, f, A::AbstractArray; + treducemap(op, f, A::AbstractArray...; [init], nchunks::Int = nthreads(), split::Symbol = :batch, @@ -71,7 +74,10 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte - `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. - `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. - `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! -- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling should be preferred since it is more flexible and better at load balancing, and more likely to be type stable. However, `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. - `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. """ @@ -79,7 +85,8 @@ function treducemap end """ - treduce(op, A::AbstractArray; [init], + treduce(op, A::AbstractArray...; + [init], nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic, @@ -107,14 +114,17 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte - `init` optional keyword argument forwarded to `mapreduce` for the sequential parts of the calculation. - `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. - `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! -- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. - `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. """ function treduce end """ - tforeach(f, A::AbstractArray; + tforeach(f, A::AbstractArray...; nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic) :: Nothing @@ -129,12 +139,15 @@ A multithreaded function like `Base.foreach`. Apply `f` to each element of `A` o - `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. - `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! -- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tforeach end """ - tmap(f, [OutputElementType], A::AbstractArray; + tmap(f, [OutputElementType], A::AbstractArray...; nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic) @@ -149,12 +162,15 @@ fewer allocations than the version where `OutputElementType` is not specified. - `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. - `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! -- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tmap end """ - tmap!(f, out, A::AbstractArray; + tmap!(f, out, A::AbstractArray...; nchunks::Int = nthreads(), split::Symbol = :batch, schedule::Symbol =:dynamic) @@ -166,7 +182,10 @@ of `out[i] = f(A[i])` for each index `i` of `A` and `out`. - `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. - `split::Symbol` (default `:batch`) is passed to `ChunkSplitters.chunks` to inform it if the data chunks to be worked on should be contiguous (:batch) or shuffled (:scatter). If `scatter` is chosen, then your reducing operator `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! -- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tmap! end @@ -181,7 +200,10 @@ inputs. The optional argument `OutputElementType` will select a specific element ## Keyword arguments: - `nchunks::Int` (default `nthreads()`) is passed to `ChunkSplitters.chunks` to inform it how many pieces of data should be worked on in parallel. Greater `nchunks` typically helps with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)), but at the expense of creating more overhead. -- `schedule::Symbol` either `:dynamic` or `:static` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. `:dynamic` scheduling is generally preferred since it is more flexible and better at load balancing, but `:static` scheduling can sometimes be more performant when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. +- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of + - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. + - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tcollect end diff --git a/src/implementation.jl b/src/implementation.jl index 56d786ec..a2a6ee0c 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -9,77 +9,96 @@ using Base.Threads: nthreads, @threads using BangBang: BangBang, append!! -function tmapreduce(f, op, A; +function tmapreduce(f, op, Arrs...; nchunks::Int=nthreads(), split::Symbol=:batch, schedule::Symbol=:dynamic, outputtype::Type=Any, kwargs...) - if schedule === :dynamic - _tmapreduce(f, op, A, outputtype, nchunks, split; kwargs...) + if schedule === :dynamic + _tmapreduce(f, op, Arrs, outputtype, nchunks, split, :default; kwargs...) + elseif schedule === :interactive + _tmapreduce(f, op, Arrs, outputtype, nchunks, split, :interactive; kwargs...) elseif schedule === :static - _tmapreduce_static(f, op, A, outputtype, nchunks, split; kwargs...) + _tmapreduce_static(f, op, Arrs, outputtype, nchunks, split; kwargs...) else schedule_err(schedule) end end @noinline schedule_err(s) = error(ArgumentError("Invalid schedule option: $s, expected :dynamic or :static.")) -treducemap(op, f, A; kwargs...) = tmapreduce(f, op, A; kwargs...) +treducemap(op, f, A...; kwargs...) = tmapreduce(f, op, A...; kwargs...) -function _tmapreduce(f, op, A, ::Type{OutputType}, nchunks, split=:batch; kwargs...)::OutputType where {OutputType} - tasks = map(chunks(A; n=nchunks, split)) do inds - @spawn mapreduce(f, op, @view(A[inds]); kwargs...) +function _tmapreduce(f, op, Arrs, ::Type{OutputType}, nchunks, split, schedule; kwargs...)::OutputType where {OutputType} + check_all_have_same_indices(Arrs) + tasks = map(chunks(first(Arrs); n=nchunks, split)) do inds + args = map(A -> A[inds], Arrs) + @spawn schedule mapreduce(f, op, args...; kwargs...) end mapreduce(fetch, op, tasks) end -function _tmapreduce_static(f, op, A, ::Type{OutputType}, nchunks, split; kwargs...) where {OutputType} +function _tmapreduce_static(f, op, Arrs, ::Type{OutputType}, nchunks, split; kwargs...) where {OutputType} nt = nthreads() + check_all_have_same_indices(Arrs) if nchunks > nt # We could implement strategies, like round-robin, in the future throw(ArgumentError("We currently only support `nchunks <= nthreads()` for static scheduling.")) end - tasks = map(enumerate(chunks(A; n=nchunks, split))) do (c, inds) + tasks = map(enumerate(chunks(first(Arrs); n=nchunks, split))) do (c, inds) tid = @inbounds nthtid(c) - @spawnat tid mapreduce(f, op, @view(A[inds]); kwargs...) + args = map(A -> A[inds], Arrs) + @spawnat tid mapreduce(f, op, args...; kwargs...) end mapreduce(fetch, op, tasks) end + +check_all_have_same_indices(Arrs) = let A = first(Arrs), Arrs = Arrs[2:end] + if !all(B -> eachindex(A) == eachindex(B), Arrs) + error("The indices of the input arrays must match the indices of the output array.") + end +end + #------------------------------------------------------------- -function treduce(op, A; kwargs...) - tmapreduce(identity, op, A; kwargs...) +function treduce(op, A...; kwargs...) + tmapreduce(identity, op, A...; kwargs...) end #------------------------------------------------------------- -function tforeach(f, A::AbstractArray; kwargs...)::Nothing - tmapreduce(f, (l, r) -> l, A; kwargs..., init=nothing, outputtype=Nothing) +function tforeach(f, A...; kwargs...)::Nothing + tmapreduce(f, (l, r) -> l, A...; kwargs..., init=nothing, outputtype=Nothing) end #------------------------------------------------------------- -function tmap(f, ::Type{T}, A::AbstractArray; kwargs...) where {T} - tmap!(f, similar(A, T), A; kwargs...) +function tmap(f, ::Type{T}, A::AbstractArray, _Arrs::AbstractArray...; kwargs...) where {T} + Arrs = (A, _Arrs...) + tmap!(f, similar(A, T), Arrs...; kwargs...) end -function tmap(f, A; nchunks::Int= 2*nthreads(), kwargs...) +function tmap(f, A::AbstractArray, _Arrs::AbstractArray...; nchunks::Int=nthreads(), kwargs...) + Arrs = (A, _Arrs...) + check_all_have_same_indices(Arrs) the_chunks = collect(chunks(A; n=nchunks)) # It's vital that we force split=:batch here because we're not doing a commutative operation! v = tmapreduce(append!!, the_chunks; kwargs..., nchunks, split=:batch) do inds - map(f, @view A[inds]) + args = map(A -> @view(A[inds]), Arrs) + map(f, args...) end reshape(v, size(A)...) end -@propagate_inbounds function tmap!(f, out, A::AbstractArray; kwargs...) - @boundscheck eachindex(out) == eachindex(A) || error("The indices of the input array must match the indices of the output array.") +@propagate_inbounds function tmap!(f, out, A::AbstractArray, _Arrs::AbstractArray...; kwargs...) + Arrs = (A, _Arrs...) + @boundscheck check_all_have_same_indices((out, Arrs...)) # It's vital that we force split=:batch here because we're not doing a commutative operation! - tforeach(eachindex(A); kwargs..., split=:batch) do i - fAi = f(@inbounds A[i]) - out[i] = fAi + tforeach(eachindex(out); kwargs..., split=:batch) do i + args = map(A -> @inbounds(A[i]), Arrs) + res = f(args...) + out[i] = res end out end diff --git a/src/tools.jl b/src/tools.jl index 93591e1d..af2e8899 100644 --- a/src/tools.jl +++ b/src/tools.jl @@ -18,7 +18,7 @@ end """ taskid() :: UInt -Return a `UInt` identifier for the current running [Task](https://docs.julialang.org/en/v1/base/parallel/#Core.Task). This identifier will +Return a `UInt` identifier for the current running [Task](https://docs.julialang.org/en/v1/base/parallel/#Core.Task). This identifier will be unique so long as references to the task it came from still exist. """ taskid() = objectid(current_task()) diff --git a/test/runtests.jl b/test/runtests.jl index c5b8d3af..f82c2140 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,43 +1,43 @@ using Test, OhMyThreads @testset "Basics" begin - for (~, f, op, itr) ∈ [ - (isapprox, sin, +, rand(ComplexF64, 10, 10)), - (isapprox, cos, max, 1:100000), - (==, round, vcat, randn(1000)), - (==, last, *, [1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"]) + for (~, f, op, itrs) ∈ [ + (isapprox, sin∘*, +, (rand(ComplexF64, 10, 10), rand(-10:10, 10, 10))), + (isapprox, cos, max, (1:100000,)), + (==, round, vcat, (randn(1000),)), + (==, last, *, ([1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"],)) ] - @testset for schedule ∈ (:static, :dynamic,) + @testset for schedule ∈ (:static, :dynamic, :interactive) @testset for split ∈ (:batch, :scatter) if split == :scatter # scatter only works for commutative operators if op ∈ (vcat, *) continue end end - for nchunks ∈ (1, 2, 6, 10, 100) + for nchunks ∈ (1, 2, 6, 10) if schedule == :static && nchunks > Threads.nthreads() continue end kwargs = (; schedule, split, nchunks) - mapreduce_f_op_itr = mapreduce(f, op, itr) - @test tmapreduce(f, op, itr; kwargs...) ~ mapreduce_f_op_itr - @test treducemap(op, f, itr; kwargs...) ~ mapreduce_f_op_itr - @test treduce(op, f.(itr); kwargs...) ~ mapreduce_f_op_itr + mapreduce_f_op_itr = mapreduce(f, op, itrs...) + @test tmapreduce(f, op, itrs...; kwargs...) ~ mapreduce_f_op_itr + @test treducemap(op, f, itrs...; kwargs...) ~ mapreduce_f_op_itr + @test treduce(op, f.(itrs...); kwargs...) ~ mapreduce_f_op_itr - map_f_itr = map(f, itr) - @test all(tmap(f, Any, itr; kwargs...) .~ map_f_itr) - @test all(tcollect(Any, (f(x) for x in itr); kwargs...) .~ map_f_itr) - @test all(tcollect(Any, f.(itr); kwargs...) .~ map_f_itr) + map_f_itr = map(f, itrs...) + @test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr) + @test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr) + @test all(tcollect(Any, f.(itrs...); kwargs...) .~ map_f_itr) - @test tmap(f, itr; kwargs...) ~ map_f_itr - @test tcollect((f(x) for x in itr); kwargs...) ~ map_f_itr - @test tcollect(f.(itr); kwargs...) ~ map_f_itr + @test tmap(f, itrs...; kwargs...) ~ map_f_itr + @test tcollect((f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr + @test tcollect(f.(itrs...); kwargs...) ~ map_f_itr - RT = Core.Compiler.return_type(f, Tuple{eltype(itr)}) + RT = Core.Compiler.return_type(f, Tuple{eltype.(itrs)...}) - @test tmap(f, RT, itr; kwargs...) ~ map_f_itr - @test tcollect(RT, (f(x) for x in itr); kwargs...) ~ map_f_itr - @test tcollect(RT, f.(itr); kwargs...) ~ map_f_itr + @test tmap(f, RT, itrs...; kwargs...) ~ map_f_itr + @test tcollect(RT, (f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr + @test tcollect(RT, f.(itrs...); kwargs...) ~ map_f_itr end end end