From 37837abe9eebcca648903475d85e8ecd4238ab25 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Mon, 18 Mar 2024 18:23:33 +0100 Subject: [PATCH] critical section --- CHANGELOG.md | 4 + docs/src/refs/api.md | 1 + src/OhMyThreads.jl | 2 +- src/macro_impl.jl | 42 ++++++++- src/macros.jl | 32 +++++++ test/runtests.jl | 214 +++++++++++++++++++++++++++---------------- 6 files changed, 212 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 466b3226..f4b0a3d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ OhMyThreads.jl Changelog ========================= +Version 0.5.1 +------------- +- ![Feature][badge-feature] Within a `@tasks` block one can now mark a section as "critical" via `@section :critical begin ... end`. + Version 0.5.0 ------------- diff --git a/docs/src/refs/api.md b/docs/src/refs/api.md index 2e7b494a..bb24161f 100644 --- a/docs/src/refs/api.md +++ b/docs/src/refs/api.md @@ -11,6 +11,7 @@ CollapsedDocStrings = true @tasks @set @local +@section ``` ### Functions diff --git a/src/OhMyThreads.jl b/src/OhMyThreads.jl index cf8da9a4..39cf8faa 100644 --- a/src/OhMyThreads.jl +++ b/src/OhMyThreads.jl @@ -20,7 +20,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler SerialScheduler include("implementation.jl") -export @tasks, @set, @local +export @tasks, @set, @local, @section export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler diff --git a/src/macro_impl.jl b/src/macro_impl.jl index b4e74a58..235e0f76 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -20,6 +20,7 @@ function tasks_macro(forex) locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args) tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before) _maybe_handle_atset_block!(settings, forbody.args) + setup_sections = _maybe_handle_atsection_blocks!(forbody.args) forbody = esc(forbody) itrng = esc(itrng) @@ -39,6 +40,7 @@ function tasks_macro(forex) end q = if isgiven(settings.reducer) quote + $setup_sections $make_mapping_function tmapreduce(mapping_function, $(settings.reducer), $(itrng)) @@ -46,12 +48,14 @@ function tasks_macro(forex) elseif isgiven(settings.collect) maybe_warn_useless_init(settings) quote + $setup_sections $make_mapping_function tmap(mapping_function, $(itrng)) end else maybe_warn_useless_init(settings) quote + $setup_sections $make_mapping_function tforeach(mapping_function, $(itrng)) end @@ -68,7 +72,7 @@ function tasks_macro(forex) for (k, v) in settings.kwargs push!(kwexpr.args, Expr(:kw, k, v)) end - insert!(q.args[4].args, 2, kwexpr) + insert!(q.args[6].args, 2, kwexpr) # wrap everything in a let ... end block # and, potentially, define the `TaskLocalValue`s. @@ -151,16 +155,15 @@ function _atlocal_assign_to_exprs(ex) tls_type = esc(left_ex.args[2]) local_before = :($(tl_storage) = TaskLocalValue{$tls_type}(() -> $(tls_def))) else - tls_sym = esc(left_ex) + tls_sym = esc(left_ex) local_before = :($(tl_storage) = let f = () -> $(tls_def) - TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f) - end) + TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f) + end) end local_name = :($(tls_sym)) return local_before, local_name end - function _maybe_handle_atset_block!(settings, args) idcs = findall(args) do arg arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set") @@ -201,3 +204,32 @@ function _handle_atset_single_assign!(settings, ex) push!(settings.kwargs, sym => esc(def)) end end + +function _maybe_handle_atsection_blocks!(args) + idcs = findall(args) do arg + arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@section") + end + isnothing(idcs) && return # no @section blocks + setup_sections = quote end + for i in idcs + kind = args[i].args[3] + body = args[i].args[4] + if kind isa QuoteNode + if kind.value == :critical + @gensym critical_lock + init_lock_ex = esc(:($(critical_lock) = $(ReentrantLock()))) + push!(setup_sections.args, init_lock_ex) + args[i] = quote + lock($(critical_lock)) do + $(body) + end + end + else + throw(ErrorException("Unknown section kind :$(kind.value).")) + end + else + throw(ErrorException("Wrong usage of @section. Must be followed by a symbol, indicating the kind of section.")) + end + end + return setup_sections +end diff --git a/src/macros.jl b/src/macros.jl index 2c868144..844bed1f 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -151,3 +151,35 @@ end error("The @local macro may only be used inside of a @tasks block.") end end + +""" + @section kind begin ... end + +This can be used inside a `@tasks for ... end` block to identify a section of code that +must be executed according to a specific synchronization policy +(as indicated by the symbol `kind`, see below). + +Multiple `@section` blocks are supported. + +## Kinds + +* `:critical`: Section of code that must be executed by a single task at a time (arbitrary order). + +## Example + +```julia +@tasks for i in 1:10 + @set ntasks = 10 + + println(i, ": before") + @section :critical begin + println(i, ": critical") + sleep(1) + end + println(i, ": after") +end +``` +""" +macro section(args...) + error("The @section macro may only be used inside of a @tasks block.") +end diff --git a/test/runtests.jl b/test/runtests.jl index 973a1355..4fe17be4 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -10,19 +10,21 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +, itrs = ([1 => "a", 2 => "b", 3 => "c", 4 => "d", 5 => "e"],), init = "")] -ChunkedGreedy(;kwargs...) = GreedyScheduler(;kwargs...) +ChunkedGreedy(; kwargs...) = GreedyScheduler(; kwargs...) @testset "Basics" begin for (; ~, f, op, itrs, init) in sets_to_test @testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin @testset for sched in ( - StaticScheduler, DynamicScheduler, GreedyScheduler, DynamicScheduler{OhMyThreads.Schedulers.NoChunking}, SerialScheduler, ChunkedGreedy) + StaticScheduler, DynamicScheduler, GreedyScheduler, + DynamicScheduler{OhMyThreads.Schedulers.NoChunking}, + SerialScheduler, ChunkedGreedy) @testset for split in (:batch, :scatter) for nchunks in (1, 2, 6) if sched == GreedyScheduler scheduler = sched(; ntasks = nchunks) elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking} - scheduler = DynamicScheduler(; chunking=false) + scheduler = DynamicScheduler(; chunking = false) elseif sched == SerialScheduler scheduler = SerialScheduler() else @@ -30,7 +32,8 @@ ChunkedGreedy(;kwargs...) = GreedyScheduler(;kwargs...) end kwargs = (; scheduler) - if (split == :scatter || sched ∈ (GreedyScheduler, ChunkedGreedy)) || op ∉ (vcat, *) + if (split == :scatter || + sched ∈ (GreedyScheduler, ChunkedGreedy)) || op ∉ (vcat, *) # scatter and greedy only works for commutative operators! else mapreduce_f_op_itr = mapreduce(f, op, itrs...) @@ -66,7 +69,8 @@ end; @testset "ChunkSplitters.Chunk" begin x = rand(100) chnks = OhMyThreads.chunks(x; n = Threads.nthreads()) - for scheduler in (DynamicScheduler(; chunking=false), StaticScheduler(; chunking=false)) + for scheduler in ( + DynamicScheduler(; chunking = false), StaticScheduler(; chunking = false)) @testset "$scheduler" begin @test tmap(x -> sin.(x), chnks; scheduler) ≈ map(x -> sin.(x), chnks) @test tmapreduce(x -> sin.(x), vcat, chnks; scheduler) ≈ @@ -86,106 +90,106 @@ end; # reduction @test @tasks(for i in 1:3 - @set reducer=(+) + @set reducer = (+) i end) == 6 # scheduler settings for sched in (StaticScheduler(), DynamicScheduler(), GreedyScheduler()) @test @tasks(for i in 1:3 - @set scheduler=sched + @set scheduler = sched i end) |> isnothing end # scheduler settings as symbols @test @tasks(for i in 1:3 - @set scheduler=:static + @set scheduler = :static i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic + @set scheduler = :dynamic i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:greedy + @set scheduler = :greedy i end) |> isnothing # @set begin ... end @test @tasks(for i in 1:10 @set begin - scheduler=StaticScheduler() - reducer=(+) + scheduler = StaticScheduler() + reducer = (+) end i end) == 55 # multiple @set @test @tasks(for i in 1:10 - @set scheduler=StaticScheduler() + @set scheduler = StaticScheduler() i - @set reducer=(+) + @set reducer = (+) end) == 55 # @set init @test @tasks(for i in 1:10 @set begin - reducer=(+) - init=0.0 + reducer = (+) + init = 0.0 end i end) === 55.0 @test @tasks(for i in 1:10 @set begin - reducer=(+) - init=0.0*im + reducer = (+) + init = 0.0 * im end i end) === (55.0 + 0.0im) # top-level "kwargs" @test @tasks(for i in 1:3 - @set scheduler=:static - @set ntasks=1 + @set scheduler = :static + @set ntasks = 1 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:static - @set nchunks=2 + @set scheduler = :static + @set nchunks = 2 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic - @set chunksize=2 + @set scheduler = :dynamic + @set chunksize = 2 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic - @set chunking=false + @set scheduler = :dynamic + @set chunking = false i end) |> isnothing @test_throws ArgumentError @tasks(for i in 1:3 - @set scheduler=DynamicScheduler() - @set chunking=false + @set scheduler = DynamicScheduler() + @set chunking = false i end) @test_throws MethodError @tasks(for i in 1:3 - @set scheduler=:serial - @set chunking=false + @set scheduler = :serial + @set chunking = false i end) @test_throws MethodError @tasks(for i in 1:3 - @set scheduler=:dynamic - @set asd=123 + @set scheduler = :dynamic + @set asd = 123 i end) # TaskLocalValue - ntd = 2*Threads.nthreads() + ntd = 2 * Threads.nthreads() ptrs = Vector{Ptr{Nothing}}(undef, ntd) tids = Vector{UInt64}(undef, ntd) tid() = OhMyThreads.Tools.taskid() @test @tasks(for i in 1:ntd @local C::Vector{Float64} = rand(3) - @set scheduler=:static + @set scheduler = :static ptrs[i] = pointer_from_objref(C) tids[i] = tid() end) |> isnothing @@ -217,17 +221,17 @@ end; sched = StaticScheduler() sched_sym = :static data = rand(10) - red = (a,b) -> a+b + red = (a, b) -> a + b n = 2 @test @tasks(for d in data - @set scheduler=sched - @set reducer=red + @set scheduler = sched + @set reducer = red var * d end) ≈ var * sum(data) @test @tasks(for d in data - @set scheduler=sched_sym - @set ntasks=n - @set reducer=red + @set scheduler = sched_sym + @set ntasks = n + @set reducer = red var * d end) ≈ var * sum(data) @@ -236,13 +240,14 @@ end; end @test @tasks(for _ in 1:10 @local C = SingleInt(var) - @set reducer=+ + @set reducer = + C.x - end) == 10*var + end) == 10 * var end; @testset "WithTaskLocals" begin - let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) + let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), + y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) # Equivalent to # function f() # x[][] += 1 @@ -259,7 +264,7 @@ end; # Make sure we can call `f` like a regular function @test f() == (1, 1) @test f() == (2, 2) - @test @fetch( f() ) == (1, 1) + @test @fetch(f()) == (1, 1) # Acceptable use of promise_task_local @test @fetch(promise_task_local(f)()) == (1, 1) # Acceptable use of promise_task_local @@ -283,23 +288,32 @@ end; @testset "chunking mode + chunksize option" begin for sched in (DynamicScheduler, StaticScheduler) @test sched() isa sched - @test sched(; chunksize=2) isa sched + @test sched(; chunksize = 2) isa sched - @test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount - @test OhMyThreads.Schedulers.chunking_mode(sched(; chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2, chunksize=4, chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=-2, chunksize=-4, split=:whatever, chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=-2, chunksize=-4, chunking=false)) == false - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2, chunksize=4, chunking=false)) == false + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize = 2)) == + OhMyThreads.Schedulers.FixedSize + @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks = 2)) == + OhMyThreads.Schedulers.FixedCount + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; + nchunks = 2, chunksize = 4, chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; + nchunks = -2, chunksize = -4, split = :whatever, chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize = 2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks = 2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; + nchunks = -2, chunksize = -4, chunking = false)) == false + @test OhMyThreads.Schedulers.chunking_enabled(sched(; + nchunks = 2, chunksize = 4, chunking = false)) == false - @test_throws ArgumentError sched(; nchunks=2, chunksize=3) - @test_throws ArgumentError sched(; nchunks=0, chunksize=0) - @test_throws ArgumentError sched(; nchunks=-2, chunksize=-3) + @test_throws ArgumentError sched(; nchunks = 2, chunksize = 3) + @test_throws ArgumentError sched(; nchunks = 0, chunksize = 0) + @test_throws ArgumentError sched(; nchunks = -2, chunksize = -3) - let scheduler = sched(; chunksize=2) + let scheduler = sched(; chunksize = 2) @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) @test isnothing(tforeach(sin, 1:10; scheduler)) @@ -312,41 +326,46 @@ end; res_tmr = mapreduce(sin, +, 1:10000) # scheduler not given - @test tmapreduce(sin, +, 1:10000; ntasks=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; nchunks=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; split=:scatter) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunksize=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunking=false) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; split = :scatter) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking = false) ≈ res_tmr # scheduler isa Scheduler - @test tmapreduce(sin, +, 1:10000; scheduler=StaticScheduler()) ≈ res_tmr - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=DynamicScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=DynamicScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; split=:scatter, scheduler=StaticScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=SerialScheduler()) + @test tmapreduce(sin, +, 1:10000; scheduler = StaticScheduler()) ≈ res_tmr + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 2, scheduler = DynamicScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; chunksize = 2, scheduler = DynamicScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; split = :scatter, scheduler = StaticScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 3, scheduler = SerialScheduler()) # scheduler isa Symbol for s in (:dynamic, :static, :serial, :greedy) - @test tmapreduce(sin, +, 1:10000; scheduler=s, init=0.0) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; scheduler = s, init = 0.0) ≈ res_tmr end for s in (:dynamic, :static, :greedy) - @test tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=s, init=0.0) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 2, scheduler = s, init = 0.0) ≈ res_tmr end for s in (:dynamic, :static) - @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunking=false, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; nchunks=3, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=s) ≈ res_tmr - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, nchunks=2, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize = 2, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking = false, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks = 3, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 3, scheduler = s) ≈ res_tmr + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 3, nchunks = 2, scheduler = s)≈res_tmr end end; @testset "empty collections" begin for empty_coll in (11:9, Float64[]) - for f in (sin, x->im*x, identity) + for f in (sin, x -> im * x, identity) for op in (+, *, min) @test_throws ArgumentError tmapreduce(f, op, empty_coll) - for init in (0.0, 0, 0.0*im, 0f0) + for init in (0.0, 0, 0.0 * im, 0.0f0) @test tmapreduce(f, op, empty_coll; init) == init end @test tforeach(f, empty_coll) |> isnothing @@ -355,4 +374,45 @@ end; end end; +# for testing @section :critical +mutable struct SingleAccessOnly + in_use::Bool + const lck::ReentrantLock + SingleAccessOnly() = new(false, ReentrantLock()) +end +function acquire(f, o::SingleAccessOnly) + lock(o.lck) do + o.in_use && throw(ErrorException("Already in use!")) + o.in_use = true + end + try + f() + finally + lock(o.lck) do + !o.in_use && throw(ErrorException("Conflict!")) + o.in_use = false + end + end +end + +@testset "sections" begin + @testset ":critical" begin + sao = SingleAccessOnly() + try + @tasks for i in 1:10 + @set ntasks = 10 + @section :critical begin + acquire(sao) do + sleep(0.01) + end + end + end + catch ErrorException + @test false + else + @test true + end + end +end; + # Todo way more testing, and easier tests to deal with