Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with greedy scheduler hitting empty collections #84

Merged
merged 4 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Version 0.5.0
- ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed).
- ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped.
- ![BREAKING][badge-breaking] The default value for `ntasks`/`nchunks` for `DynamicScheduler` has been changed from `2*nthreads()` to `nthreads()`. With the new value we now align with `@threads :dynamic`. The old value wasn't giving good load balancing anyways and choosing a higher value penalizes uniform use cases even more. To get the old behavior, set `nchunks=2*nthreads()`.
- ![Bugfix][badge-bugfix] When using the `GreedyScheduler` in combination with `tmapreduce` (or functions that build upon it) there could be non-deterministic errors in some cases (small input collection, not much work per element, see [#82](https://github.com/JuliaFolds2/OhMyThreads.jl/issues/82)). These cases should be fixed now.

Version 0.4.6
-------------
Expand Down
21 changes: 20 additions & 1 deletion src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ function _tmapreduce(f,
mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...)
end

# NOTE: once v1.12 releases we should switch this to wait(t; throw=false)
wait_nothrow(t) = Base._wait(t);

# GreedyScheduler
function _tmapreduce(f,
op,
Expand Down Expand Up @@ -201,9 +204,25 @@ function _tmapreduce(f,
promise_task_local(f)(args...)
end
end
# Doing this because of https://github.com/JuliaFolds2/OhMyThreads.jl/issues/82
# The idea is that if the channel gets fully consumed before a task gets started up,
# then if the user does not supply an `init` kwarg, we'll get an error.
# Current way of dealing with this is just filtering out `mapreduce_empty` method
# errors. This may not be the most stable way of dealing with things, e.g. if the
# name of the function throwing the error changes this could break, so long term
# we may want to try a different design.
filtered_tasks = filter(tasks) do stabletask
task = stabletask.t
istaskdone(task) || wait_nothrow(task)
if task.result isa MethodError && task.result.f == Base.mapreduce_empty
false
else
true
end
end
# Note, calling `promise_task_local` here is only safe because we're assuming that
# Base.mapreduce isn't going to magically try to do multithreading on us...
mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...)
mapreduce(fetch, promise_task_local(op), filtered_tasks; mapreduce_kwargs...)
end

function check_all_have_same_indices(Arrs)
Expand Down
Loading