Skip to content

Commit

Permalink
quick and dirty (only tmapreduce)
Browse files Browse the repository at this point in the history
  • Loading branch information
carstenbauer committed Mar 11, 2024
1 parent ff9ff82 commit 4719bb7
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 9 deletions.
37 changes: 35 additions & 2 deletions src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ function _tmapreduce(f,
mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...)
end

# GreedyScheduler
# GreedyScheduler w/o chunking
function _tmapreduce(f,
op,
Arrs,
::Type{OutputType},
scheduler::GreedyScheduler,
scheduler::GreedyScheduler{NoChunking},
mapreduce_kwargs)::OutputType where {OutputType}
ntasks_desired = scheduler.ntasks
if Base.IteratorSize(first(Arrs)) isa Base.SizeUnknown
Expand Down Expand Up @@ -185,6 +185,39 @@ function _tmapreduce(f,
mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...)
end

# GreedyScheduler w/ chunking
function _tmapreduce(f,
op,
Arrs,
::Type{OutputType},
scheduler::GreedyScheduler,
mapreduce_kwargs)::OutputType where {OutputType}
if Base.IteratorSize(first(Arrs)) isa Base.SizeUnknown
throw(ArgumentError("SizeUnkown iterators in combination with a greedy scheduler and chunking are currently not supported."))
end
check_all_have_same_indices(Arrs)
chnks = _chunks(scheduler, first(Arrs))
ntasks_desired = scheduler.ntasks
ntasks = min(length(chnks), ntasks_desired)

ch = Channel{typeof(first(chnks))}(length(chnks); spawn = true) do ch
for args in chnks
put!(ch, args)
end
end
tasks = map(1:ntasks) do _
# Note, calling `promise_task_local` here is only safe because we're assuming that
# Base.mapreduce isn't going to magically try to do multithreading on us...
@spawn mapreduce(promise_task_local(op), ch; mapreduce_kwargs...) do inds
args = map(A -> view(A, inds), Arrs)
mapreduce(promise_task_local(f), promise_task_local(op), args...)
end
end
# Note, calling `promise_task_local` here is only safe because we're assuming that
# Base.mapreduce isn't going to magically try to do multithreading on us...
mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...)
end

function check_all_have_same_indices(Arrs)
let A = first(Arrs), Arrs = Arrs[2:end]
if !all(B -> eachindex(A) == eachindex(B), Arrs)
Expand Down
68 changes: 61 additions & 7 deletions src/schedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ end

"""
A greedy dynamic scheduler. The elements of the collection are first put into a `Channel`
and then dynamic, non-sticky tasks are spawned to process channel content in parallel.
and then dynamic, non-sticky tasks are spawned to process the channel content in parallel.
Note that elements are processed in a non-deterministic order, and thus a potential reducing
function **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in
Expand All @@ -204,20 +204,74 @@ some additional overhead.
- `ntasks::Int` (default `nthreads()`):
* Determines the number of parallel tasks to be spawned.
* Setting `nchunks < nthreads()` is an effective way to use only a subset of the available threads.
* Setting `ntasks < nthreads()` is an effective way to use only a subset of the available threads.
- `chunking::Bool` (default `false`):
* Controls whether input elements are grouped into chunks (`true`) or not (`false`) before put into the channel. This can improve the performance especially if there are many iterations each of which are computationally cheap.
* If `nchunks` or `chunksize` are explicitly specified, `chunking` will be automatically set to `true`.
- `nchunks::Integer` (default `4 * nthreads()`):
* Determines the number of chunks (that will eventually be put into the channel).
* Increasing `nchunks` can help with [load balancing](https://en.wikipedia.org/wiki/Load_balancing_(computing)). For `nchunks <= nthreads()` there are not enough chunks for any load balancing.
- `chunksize::Integer` (default not set)
* Specifies the desired chunk size (instead of the number of chunks).
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer).
- `split::Symbol` (default `:scatter`):
* Determines how the collection is divided into chunks (if chunking=true).
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options.
"""
Base.@kwdef struct GreedyScheduler <: Scheduler
ntasks::Int = nthreads()
struct GreedyScheduler{C <: ChunkingMode} <: Scheduler
ntasks::Int
nchunks::Int
chunksize::Int
split::Symbol

function GreedyScheduler(ntasks::Int)
function GreedyScheduler(ntasks::Int, nchunks::Integer, chunksize::Integer,
split::Symbol; chunking::Bool = false)
ntasks > 0 || throw(ArgumentError("ntasks must be a positive integer"))
new(ntasks)
if !chunking
C = NoChunking
else
if !(nchunks > 0 || chunksize > 0)
throw(ArgumentError("Either nchunks or chunksize must be a positive integer (or chunking=false)."))
end
if nchunks > 0 && chunksize > 0
throw(ArgumentError("nchunks and chunksize are mutually exclusive and only one of them may be a positive integer"))
end
C = chunksize > 0 ? FixedSize : FixedCount
end
new{C}(ntasks, nchunks, chunksize, split)
end
end

function GreedyScheduler(;
ntasks::Integer = nthreads(),
nchunks::Union{Integer, Nothing} = nothing,
chunksize::Union{Integer, Nothing} = nothing,
chunking::Bool = false,
split::Symbol = :scatter)
if !isnothing(nchunks) || !isnothing(chunksize)
chunking = true
end
if !chunking
nchunks = -1
chunksize = -1
else
# only choose nchunks default if chunksize hasn't been specified
if isnothing(nchunks) && isnothing(chunksize)
nchunks = 4 * nthreads(:default)
chunksize = -1
else
nchunks = isnothing(nchunks) ? -1 : nchunks
chunksize = isnothing(chunksize) ? -1 : chunksize
end
end
GreedyScheduler(ntasks, nchunks, chunksize, split; chunking)
end

function Base.show(io::IO, mime::MIME{Symbol("text/plain")}, s::GreedyScheduler)
print("GreedyScheduler", "\n")
println(io, "├ Num. tasks: ", s.ntasks)
cstr = _chunkingstr(s)
println(io, "├ Chunking: ", cstr)
print(io, "└ Threadpool: default")
end

Expand All @@ -232,7 +286,7 @@ end
chunking_mode(s::Scheduler) = chunking_mode(typeof(s))
chunking_mode(::Type{DynamicScheduler{C}}) where {C} = C
chunking_mode(::Type{StaticScheduler{C}}) where {C} = C
chunking_mode(::Type{GreedyScheduler}) = NoChunking
chunking_mode(::Type{GreedyScheduler{C}}) where {C} = C
chunking_mode(::Type{SerialScheduler}) = NoChunking

chunking_enabled(s::Scheduler) = chunking_enabled(typeof(s))
Expand Down

0 comments on commit 4719bb7

Please sign in to comment.