From 2702ac4e4e5871a4bde5a152ce4a6667c03ebb03 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 18 Sep 2024 08:31:11 +0200 Subject: [PATCH 1/3] tmapreduce for enumerate(chunks(...)) --- src/implementation.jl | 4 ++-- test/runtests.jl | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/implementation.jl b/src/implementation.jl index 2e6ad5c..d9e3791 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -105,11 +105,11 @@ end # DynamicScheduler: ChunkSplitters.Chunk function _tmapreduce(f, op, - Arrs::Tuple{ChunkSplitters.Chunk{T}}, # we don't support multiple chunks for now + Arrs::Union{Tuple{ChunkSplitters.Chunk{T}}, Tuple{ChunkSplitters.Enumerate{T}}}, ::Type{OutputType}, scheduler::DynamicScheduler, mapreduce_kwargs)::OutputType where {OutputType, T} - (; nchunks, split, threadpool) = scheduler + (; threadpool) = scheduler chunking_enabled(scheduler) && auto_disable_chunking_warning() tasks = map(only(Arrs)) do idcs @spawn threadpool promise_task_local(f)(idcs) diff --git a/test/runtests.jl b/test/runtests.jl index ee28d76..e638404 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -83,6 +83,15 @@ end; @test isnothing(tforeach(x -> sin.(x), chnks; scheduler)) end end + + # enumerate(chunks) + data = 1:100 + @test tmapreduce(+, enumerate(chunks(data; n=5)); chunking=false) do (i, idcs) + [i, sum(@view(data[idcs]))] + end == [sum(1:5), sum(data)] + @test tmapreduce(+, enumerate(chunks(data; size=5)); chunking=false) do (i, idcs) + [i, sum(@view(data[idcs]))] + end == [sum(1:20), sum(data)] end; @testset "macro API" begin @@ -246,6 +255,18 @@ end; @set reducer = + C.x end) == 10 * var + + # enumerate(chunks) + data = collect(1:100) + @test @tasks(for (i, idcs) in enumerate(chunks(data; n=5)) + @set reducer = + + @set chunking = false + [i, sum(@view(data[idcs]))] + end) == [sum(1:5), sum(data)] + @test @tasks(for (i, idcs) in enumerate(chunks(data; size=5)) + @set reducer = + + [i, sum(@view(data[idcs]))] + end) == [sum(1:20), sum(data)] end; @testset "WithTaskLocals" begin From 66bc2926b455467a3bdbb6229431bb8779baeb23 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 18 Sep 2024 08:42:16 +0200 Subject: [PATCH 2/3] tmap for enumerate(chunks(...)) --- src/implementation.jl | 7 ++++--- test/runtests.jl | 8 ++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/implementation.jl b/src/implementation.jl index d9e3791..cd7227c 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -320,7 +320,7 @@ function tmap(f, ::Type{T}, A::AbstractArray, _Arrs::AbstractArray...; kwargs... end function tmap(f, - A::Union{AbstractArray, ChunkSplitters.Chunk}, + A::Union{AbstractArray, ChunkSplitters.Chunk, ChunkSplitters.Enumerate}, _Arrs::AbstractArray...; scheduler::MaybeScheduler = NotGiven(), kwargs...) @@ -333,7 +333,8 @@ function tmap(f, _scheduler.split != :batch error("Only `split == :batch` is supported because the parallel operation isn't commutative. (Scheduler: $_scheduler)") end - if A isa ChunkSplitters.Chunk && chunking_enabled(_scheduler) + if (A isa ChunkSplitters.Chunk || A isa ChunkSplitters.Enumerate) && + chunking_enabled(_scheduler) auto_disable_chunking_warning() if _scheduler isa DynamicScheduler _scheduler = DynamicScheduler(; @@ -377,7 +378,7 @@ end # w/o chunking (DynamicScheduler{NoChunking}): ChunkSplitters.Chunk function _tmap(scheduler::DynamicScheduler{NoChunking}, f, - A::ChunkSplitters.Chunk, + A::Union{ChunkSplitters.Chunk, ChunkSplitters.Enumerate}, _Arrs::AbstractArray...) (; threadpool) = scheduler tasks = map(A) do idcs diff --git a/test/runtests.jl b/test/runtests.jl index e638404..2fcd00b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -92,6 +92,9 @@ end; @test tmapreduce(+, enumerate(chunks(data; size=5)); chunking=false) do (i, idcs) [i, sum(@view(data[idcs]))] end == [sum(1:20), sum(data)] + @test tmap(enumerate(chunks(data; n=5)); chunking=false) do (i, idcs) + [i, idcs] + end == [[1, 1:20], [2, 21:40], [3, 41:60], [4, 61:80], [5, 81:100]] end; @testset "macro API" begin @@ -267,6 +270,11 @@ end; @set reducer = + [i, sum(@view(data[idcs]))] end) == [sum(1:20), sum(data)] + @test @tasks(for (i, idcs) in enumerate(chunks(1:100; n=5)) + @set chunking=false + @set collect=true + [i, idcs] + end) == [[1, 1:20], [2, 21:40], [3, 41:60], [4, 61:80], [5, 81:100]] end; @testset "WithTaskLocals" begin From 9f7227390def1678ed6b323ce6d4900fc275df7c Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Wed, 18 Sep 2024 08:50:23 +0200 Subject: [PATCH 3/3] fix --- CHANGELOG.md | 7 +++++++ test/runtests.jl | 12 ++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b7e942f..7d536c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,13 @@ OhMyThreads.jl Changelog ========================= +Version 0.6.2 +------------- +- ![Enhancement][badge-enhancement] Added API support for `enumerate(chunks(...))`. Best used in combination with `chunking=false`. + +Version 0.6.1 +------------- + Version 0.6.0 ------------- - ![BREAKING][badge-breaking] Drop support for Julia < 1.10. diff --git a/test/runtests.jl b/test/runtests.jl index 2fcd00b..05f0be9 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -86,13 +86,13 @@ end; # enumerate(chunks) data = 1:100 - @test tmapreduce(+, enumerate(chunks(data; n=5)); chunking=false) do (i, idcs) + @test tmapreduce(+, enumerate(OhMyThreads.chunks(data; n=5)); chunking=false) do (i, idcs) [i, sum(@view(data[idcs]))] end == [sum(1:5), sum(data)] - @test tmapreduce(+, enumerate(chunks(data; size=5)); chunking=false) do (i, idcs) + @test tmapreduce(+, enumerate(OhMyThreads.chunks(data; size=5)); chunking=false) do (i, idcs) [i, sum(@view(data[idcs]))] end == [sum(1:20), sum(data)] - @test tmap(enumerate(chunks(data; n=5)); chunking=false) do (i, idcs) + @test tmap(enumerate(OhMyThreads.chunks(data; n=5)); chunking=false) do (i, idcs) [i, idcs] end == [[1, 1:20], [2, 21:40], [3, 41:60], [4, 61:80], [5, 81:100]] end; @@ -261,16 +261,16 @@ end; # enumerate(chunks) data = collect(1:100) - @test @tasks(for (i, idcs) in enumerate(chunks(data; n=5)) + @test @tasks(for (i, idcs) in enumerate(OhMyThreads.chunks(data; n=5)) @set reducer = + @set chunking = false [i, sum(@view(data[idcs]))] end) == [sum(1:5), sum(data)] - @test @tasks(for (i, idcs) in enumerate(chunks(data; size=5)) + @test @tasks(for (i, idcs) in enumerate(OhMyThreads.chunks(data; size=5)) @set reducer = + [i, sum(@view(data[idcs]))] end) == [sum(1:20), sum(data)] - @test @tasks(for (i, idcs) in enumerate(chunks(1:100; n=5)) + @test @tasks(for (i, idcs) in enumerate(OhMyThreads.chunks(1:100; n=5)) @set chunking=false @set collect=true [i, idcs]