diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a6782d0..4185e872 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Version 0.5.0 - ![BREAKING][badge-breaking] Within a `@tasks` block, task-local values must from now on be defined via `@local` instead of `@init` (renamed). - ![BREAKING][badge-breaking] The (already deprecated) `SpawnAllScheduler` has been dropped. - ![BREAKING][badge-breaking] The default value for `ntasks`/`nchunks` for `DynamicScheduler` has been changed from `2*nthreads()` to `nthreads()`. With the new value we now align with `@threads :dynamic`. The old value wasn't giving good load balancing anyways and choosing a higher value penalizes uniform use cases even more. To get the old behavior, set `nchunks=2*nthreads()`. +- ![Bugfix][badge-bugfix] When using the `GreedyScheduler` in combination with `tmapreduce` (or functions that build upon it) there could be non-deterministic errors in some cases (small input collection, not much work per element, see [#82](https://github.com/JuliaFolds2/OhMyThreads.jl/issues/82)). These cases should be fixed now. Version 0.4.6 ------------- diff --git a/src/implementation.jl b/src/implementation.jl index 5d87ccbe..25a0795b 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -173,6 +173,9 @@ function _tmapreduce(f, mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...) end +# NOTE: once v1.12 releases we should switch this to wait(t; throw=false) +wait_nothrow(t) = Base._wait(t); + # GreedyScheduler function _tmapreduce(f, op, @@ -201,9 +204,25 @@ function _tmapreduce(f, promise_task_local(f)(args...) end end + # Doing this because of https://github.com/JuliaFolds2/OhMyThreads.jl/issues/82 + # The idea is that if the channel gets fully consumed before a task gets started up, + # then if the user does not supply an `init` kwarg, we'll get an error. + # Current way of dealing with this is just filtering out `mapreduce_empty` method + # errors. This may not be the most stable way of dealing with things, e.g. if the + # name of the function throwing the error changes this could break, so long term + # we may want to try a different design. + filtered_tasks = filter(tasks) do stabletask + task = stabletask.t + istaskdone(task) || wait_nothrow(task) + if task.result isa MethodError && task.result.f == Base.mapreduce_empty + false + else + true + end + end # Note, calling `promise_task_local` here is only safe because we're assuming that # Base.mapreduce isn't going to magically try to do multithreading on us... - mapreduce(fetch, promise_task_local(op), tasks; mapreduce_kwargs...) + mapreduce(fetch, promise_task_local(op), filtered_tasks; mapreduce_kwargs...) end function check_all_have_same_indices(Arrs)