Skip to content


Add ChannelLike iterator wrapper
Browse files Browse the repository at this point in the history
`ChannelLike` wraps an indexable object such that it can be iterated by
concurrent tasks in a safe manner similar to a `Channel`. This is
instead of `Channel` in the chunked `GreedyScheduler`.
  • Loading branch information
fredrikekre committed Sep 27, 2024
1 parent 4b91f66 commit 4bc8088
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 8 deletions.
3 changes: 3 additions & 0 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Version 0.7.0
- ![BREAKING][badge-breaking] If you provide a `chunks` or `index_chunks` as input we now disable the internal chunking without a warning. Previously, we did show a warning unless you had set `chunking=false`. In contrast, we now throw an error when you set any incompatible chunking related keyword arguments.
- ![Deprecation][badge-deprecation] The `split` options `:batch` and `:scatter` are now deprecated (they still work but will be dropped at some point). Use `:consecutive` and `:roundrobin`, respectively, instead.
- ![Enhancement][badge-enhancement] The `split` keyword argument can now also be a `<: OhMyThreads.Split`. Compared to providing a `Symbol`, the former can potentially give better performance. For example, you can replace `:consecutive` by `OhMyThreads.Consecutive()` and `:roundrobin` by `OhMyThreads.RoundRobin()`.
- ![Feature][badge-feature] `ChannelLike` is a new public (but not exported) type. `ChannelLike(itr)` provide a way to iterate over `itr` in a concurrency safe manner similar to `Channel`. See the docstring for more details. ([#121][gh-pr-121])
- ![Enhancement][badge-enhancement] `ChannelLike` is used internally for the `GreedyScheduler` when `chunking=true`. This improves performance overall but it is especially noticeable when the number of chunks is large. ([#121][gh-pr-121])

Version 0.6.2
Expand Down Expand Up @@ -136,3 +138,4 @@ Version 0.2.0

1 change: 1 addition & 0 deletions docs/src/refs/
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ SerialScheduler
11 changes: 5 additions & 6 deletions src/implementation.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Implementation

import OhMyThreads: treduce, tmapreduce, treducemap, tforeach, tmap, tmap!, tcollect
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local
using OhMyThreads: @spawn, @spawnat, WithTaskLocals, promise_task_local, ChannelLike
using OhMyThreads.Tools: nthtid
using OhMyThreads: Scheduler,
DynamicScheduler, StaticScheduler, GreedyScheduler,
Expand Down Expand Up @@ -207,6 +207,7 @@ function _tmapreduce(f,
ntasks = min(length(first(Arrs)), ntasks_desired)
ch_len = length(first(Arrs))
# TODO: Use ChannelLike for iterators that support it. Dispatch on IndexLinear?
ch = Channel{Tuple{eltype.(Arrs)...}}(ch_len; spawn = true) do ch
for args in zip(Arrs...)
put!(ch, args)
Expand Down Expand Up @@ -255,11 +256,9 @@ function _tmapreduce(f,
ntasks_desired = scheduler.ntasks
ntasks = min(length(chnks), ntasks_desired)

ch = Channel{typeof(first(chnks))}(length(chnks); spawn = true) do ch
for args in chnks
put!(ch, args)
# ChunkSplitters.IndexChunks support everything needed for ChannelLike
ch = ChannelLike(chnks)

tasks = map(1:ntasks) do _
# Note, calling `promise_task_local` here is only safe because we're assuming that
# Base.mapreduce isn't going to magically try to do multithreading on us...
Expand Down
5 changes: 3 additions & 2 deletions src/schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ end
GreedyScheduler (aka :greedy)
A greedy dynamic scheduler. The elements of the collection are first put into a `Channel`
and then dynamic, non-sticky tasks are spawned to process the channel content in parallel.
A greedy dynamic scheduler. Dynamic, non-sticky tasks are spawned to process the elements of
the collection. The collection is shared between all tasks and each task takes the next
element as soon as the previous one is finished.
Note that elements are processed in a non-deterministic order, and thus a potential reducing
function **must** be [commutative]( in
Expand Down
63 changes: 63 additions & 0 deletions src/types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,66 @@ promise_task_local(f::Any) = f
function (f::WithTaskLocals{F})(args...; kwargs...) where {F}
promise_task_local(f)(args...; kwargs...)

This struct wraps an indexable object such that it can be iterated by concurrent tasks in a
safe manner similar to a `Channel`.
`ChannelLike(itr)` is conceptually similar to:
Channel{eltype(itr)}(length(itr)) do ch
foreach(i -> put!(ch, i), itr)
i.e. creating a channel, `put!`ing all elements of `itr` into it and closing it. The
advantage is that `ChannelLike` doesn't copy the data.
# Examples
ch = OhMyThreads.ChannelLike(1:5)
@sync for taskid in 1:2
Threads.@spawn begin
for i in ch
println("Task #\$taskid processing item \$i")
sleep(1 / i)
# output
Task #1 processing item 1
Task #2 processing item 2
Task #2 processing item 3
Task #2 processing item 4
Task #1 processing item 5
Note that `ChannelLike` is stateful (just like a `Channel`), so you can't iterate over it
The wrapped iterator must support `firstindex(itr)::Int`, `lastindex(itr)::Int` and
`getindex(itr, ::Int)`.
mutable struct ChannelLike{T}
const itr::T
@atomic idx::Int
function ChannelLike(itr::T) where {T}
return new{T}(itr, firstindex(itr) - 1)

Base.length(ch::ChannelLike) = length(ch.itr)
Base.eltype(ch::ChannelLike) = eltype(ch.itr)

function Base.iterate(ch::ChannelLike, ::Nothing = nothing)
this = @atomic ch.idx += 1
if this <= lastindex(ch.itr)
return (@inbounds(ch.itr[this]), nothing)
return nothing

0 comments on commit 4bc8088

Please sign in to comment.