From 177672965dbf29d8c59d6e48f1ef5efbe6a9fe9d Mon Sep 17 00:00:00 2001 From: Fredrik Ekre Date: Tue, 24 Sep 2024 11:16:53 +0200 Subject: [PATCH] Add ChannelLike iterator wrapper `ChannelLike` wraps an indexable object such that it can be iterated by concurrent tasks in a safe manner similar to a `Channel`. This is instead of `Channel` in the chunked `GreedyScheduler`. --- CHANGELOG.md | 3 ++ docs/src/literate/tls/tls.jl | 7 ++++ docs/src/literate/tls/tls.md | 7 ++++ docs/src/refs/api.md | 1 + src/implementation.jl | 11 +++---- src/schedulers.jl | 9 +++--- src/types.jl | 63 ++++++++++++++++++++++++++++++++++++ 7 files changed, 91 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3170738c..264c8b42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ Version 0.7.0 - ![BREAKING][badge-breaking] If you provide a `chunks` or `index_chunks` as input we now disable the internal chunking without a warning. Previously, we did show a warning unless you had set `chunking=false`. In contrast, we now throw an error when you set any incompatible chunking related keyword arguments. - ![Deprecation][badge-deprecation] The `split` options `:batch` and `:scatter` are now deprecated (they still work but will be dropped at some point). Use `:consecutive` and `:roundrobin`, respectively, instead. - ![Enhancement][badge-enhancement] The `split` keyword argument can now also be a `<: OhMyThreads.Split`. Compared to providing a `Symbol`, the former can potentially give better performance. For example, you can replace `:consecutive` by `OhMyThreads.Consecutive()` and `:roundrobin` by `OhMyThreads.RoundRobin()`. +- ![Feature][badge-feature] `ChannelLike` is a new public (but not exported) type. `ChannelLike(itr)` provide a way to iterate over `itr` in a concurrency safe manner similar to `Channel`. See the docstring for more details. ([#121][gh-pr-121]) +- ![Enhancement][badge-enhancement] `ChannelLike` is used internally for the `GreedyScheduler` when `chunking=true`. This improves performance overall but it is especially noticeable when the number of chunks is large. ([#121][gh-pr-121]) Version 0.6.2 ------------- @@ -136,3 +138,4 @@ Version 0.2.0 [gh-issue-25]: https://github.com/JuliaFolds2/OhMyThreads.jl/issues/25 [gh-pr-5]: https://github.com/JuliaFolds2/OhMyThreads.jl/pull/5 +[gh-pr-121]: https://github.com/JuliaFolds2/OhMyThreads.jl/pull/121 diff --git a/docs/src/literate/tls/tls.jl b/docs/src/literate/tls/tls.jl index 369776e2..0e899341 100644 --- a/docs/src/literate/tls/tls.jl +++ b/docs/src/literate/tls/tls.jl @@ -382,6 +382,13 @@ sort(res_nu) ≈ sort(res_channel_flipped) @btime matmulsums_perthread_channel_flipped($As_nu, $Bs_nu; ntasks = 2 * nthreads()); @btime matmulsums_perthread_channel_flipped($As_nu, $Bs_nu; ntasks = 10 * nthreads()); +# In addition, OhMyThreads provides an iterator-wrapper type +# [`OhMyThreads.ChannelLike`](@ref) which can be used in place of a `Channel`. If +# the number of elements is large this can be more efficient since there is no +# need to copy the elements into the `Channel`. Concretely, in the example above, +# we could replace `Channel() do .. end` with +# `OhMyThreads.ChannelLike(1:length(As))`. + # ## Bumper.jl (only for the brave) # # If you are bold and want to cut down temporary allocations even more you can diff --git a/docs/src/literate/tls/tls.md b/docs/src/literate/tls/tls.md index 1ce2e759..c73646fa 100644 --- a/docs/src/literate/tls/tls.md +++ b/docs/src/literate/tls/tls.md @@ -490,6 +490,13 @@ Quick benchmark: ```` +In addition, OhMyThreads provides an iterator-wrapper type +[`OhMyThreads.ChannelLike`](@ref) which can be used in place of a `Channel`. If +the number of elements is large this can be more efficient since there is no +need to copy the elements into the `Channel`. Concretely, in the example above, +we could replace `Channel() do .. end` with +`OhMyThreads.ChannelLike(1:length(As))`. + ## Bumper.jl (only for the brave) If you are bold and want to cut down temporary allocations even more you can diff --git a/docs/src/refs/api.md b/docs/src/refs/api.md index 7f85308d..39a38ba8 100644 --- a/docs/src/refs/api.md +++ b/docs/src/refs/api.md @@ -61,4 +61,5 @@ SerialScheduler ```@docs OhMyThreads.WithTaskLocals OhMyThreads.promise_task_local +OhMyThreads.ChannelLike ``` diff --git a/src/implementation.jl b/src/implementation.jl index 57614db0..e4421858 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -1,7 +1,7 @@ module Implementation import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcollect -using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local +using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local, ChannelLike using OhMyThreads.Tools: nthtid using OhMyThreads: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, @@ -207,6 +207,7 @@ function _tmapreduce(f, ntasks = min(length(first(Arrs)), ntasks_desired) ch_len = length(first(Arrs)) end + # TODO: Use ChannelLike for iterators that support it. Dispatch on IndexLinear? ch = Channel{Tuple{eltype.(Arrs)...}}(ch_len; spawn = true) do ch for args in zip(Arrs...) put!(ch, args) @@ -255,11 +256,9 @@ function _tmapreduce(f, ntasks_desired = scheduler.ntasks ntasks = min(length(chnks), ntasks_desired) - ch = Channel{typeof(first(chnks))}(length(chnks); spawn = true) do ch - for args in chnks - put!(ch, args) - end - end + # ChunkSplitters.IndexChunks support everything needed for ChannelLike + ch = ChannelLike(chnks) + tasks = map(1:ntasks) do _ # Note, calling `promise_task_local` here is only safe because we're assuming that # Base.mapreduce isn't going to magically try to do multithreading on us... diff --git a/src/schedulers.jl b/src/schedulers.jl index 6a51a12d..8c04580a 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -233,8 +233,9 @@ end """ GreedyScheduler (aka :greedy) -A greedy dynamic scheduler. The elements of the collection are first put into a `Channel` -and then dynamic, non-sticky tasks are spawned to process the channel content in parallel. +A greedy dynamic scheduler. The elements are put into a shared workqueue and dynamic, +non-sticky, tasks are spawned to process the elements of the queue with each task taking a new +element from the queue as soon as the previous one is done. Note that elements are processed in a non-deterministic order, and thus a potential reducing function **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in @@ -249,10 +250,10 @@ some additional overhead. * Determines the number of parallel tasks to be spawned. * Setting `ntasks < nthreads()` is an effective way to use only a subset of the available threads. - `chunking::Bool` (default `false`): - * Controls whether input elements are grouped into chunks (`true`) or not (`false`) before put into the channel. This can improve the performance especially if there are many iterations each of which are computationally cheap. + * Controls whether input elements are grouped into chunks (`true`) or not (`false`) before put into the shared workqueue. This can improve the performance especially if there are many iterations each of which are computationally cheap. * If `nchunks` or `chunksize` are explicitly specified, `chunking` will be automatically set to `true`. - `nchunks::Integer` (default `10 * nthreads()`): - * Determines the number of chunks (that will eventually be put into the channel). + * Determines the number of chunks (that will eventually be put into the shared workqueue). * Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). For `nchunks <= nthreads()` there are not enough chunks for any load balancing. - `chunksize::Integer` (default not set) * Specifies the desired chunk size (instead of the number of chunks). diff --git a/src/types.jl b/src/types.jl index f26ed842..c41d6f92 100644 --- a/src/types.jl +++ b/src/types.jl @@ -71,3 +71,66 @@ promise_task_local(f::Any) = f function (f::WithTaskLocals{F})(args...; kwargs...) where {F} promise_task_local(f)(args...; kwargs...) end + +""" + ChannelLike(itr) + +This struct wraps an indexable object such that it can be iterated by concurrent tasks in a +safe manner similar to a `Channel`. + +`ChannelLike(itr)` is conceptually similar to: +```julia +Channel{eltype(itr)}(length(itr)) do ch + foreach(i -> put!(ch, i), itr) +end +``` +i.e. creating a channel, `put!`ing all elements of `itr` into it and closing it. The +advantage is that `ChannelLike` doesn't copy the data. + +# Examples +```julia +ch = OhMyThreads.ChannelLike(1:5) + +@sync for taskid in 1:2 + Threads.@spawn begin + for i in ch + println("Task #\$taskid processing item \$i") + sleep(1 / i) + end + end +end + +# output + +Task #1 processing item 1 +Task #2 processing item 2 +Task #2 processing item 3 +Task #2 processing item 4 +Task #1 processing item 5 +``` + +Note that `ChannelLike` is stateful (just like a `Channel`), so you can't iterate over it +twice. + +The wrapped iterator must support `firstindex(itr)::Int`, `lastindex(itr)::Int` and +`getindex(itr, ::Int)`. +""" +mutable struct ChannelLike{T} + const itr::T + @atomic idx::Int + function ChannelLike(itr::T) where {T} + return new{T}(itr, firstindex(itr) - 1) + end +end + +Base.length(ch::ChannelLike) = length(ch.itr) +Base.eltype(ch::ChannelLike) = eltype(ch.itr) + +function Base.iterate(ch::ChannelLike, ::Nothing = nothing) + this = @atomic ch.idx += 1 + if this <= lastindex(ch.itr) + return (@inbounds(ch.itr[this]), nothing) + else + return nothing + end +end