diff --git a/CHANGELOG.md b/CHANGELOG.md index 466b3226..c11095f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ OhMyThreads.jl Changelog ========================= +Version 0.5.1 +------------- +- ![Feature][badge-feature] Within a parallel `@tasks` block one can now mark a region with `@one_by_one`. This region will be run by one task at a time ("critical region"). +- ![Feature][badge-feature] Within a `@tasks` block one can now mark a region as with `@only_one`. This region will be run by a single parallel task only (other tasks will skip over it). + Version 0.5.0 ------------- diff --git a/docs/src/refs/api.md b/docs/src/refs/api.md index 2e7b494a..3f9134db 100644 --- a/docs/src/refs/api.md +++ b/docs/src/refs/api.md @@ -11,6 +11,8 @@ CollapsedDocStrings = true @tasks @set @local +@only_one +@one_by_one ``` ### Functions diff --git a/src/OhMyThreads.jl b/src/OhMyThreads.jl index cf8da9a4..c4592a84 100644 --- a/src/OhMyThreads.jl +++ b/src/OhMyThreads.jl @@ -20,7 +20,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler SerialScheduler include("implementation.jl") -export @tasks, @set, @local +export @tasks, @set, @local, @one_by_one, @only_one export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler diff --git a/src/macro_impl.jl b/src/macro_impl.jl index b4e74a58..0ae1ecec 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -1,3 +1,5 @@ +using OhMyThreads.Tools: OnlyOneRegion, try_enter! + function tasks_macro(forex) if forex.head != :for throw(ErrorException("Expected a for loop after `@tasks`.")) @@ -17,11 +19,26 @@ function tasks_macro(forex) settings = Settings() + # Escape everything in the loop body that is not used in conjuction with one of our + # "macros", e.g. @set or @local. Code inside of these macro blocks will be escaped by + # the respective "macro" handling functions below. + for i in findall(forbody.args) do arg + !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")) && + !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@local")) && + !(arg isa Expr && arg.head == :macrocall && + arg.args[1] == Symbol("@only_one")) && + !(arg isa Expr && arg.head == :macrocall && + arg.args[1] == Symbol("@one_by_one")) + end + forbody.args[i] = esc(forbody.args[i]) + end + locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args) tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before) _maybe_handle_atset_block!(settings, forbody.args) + setup_onlyone_blocks = _maybe_handle_atonlyone_blocks!(forbody.args) + setup_onebyone_blocks = _maybe_handle_atonebyone_blocks!(forbody.args) - forbody = esc(forbody) itrng = esc(itrng) itvar = esc(itvar) @@ -39,6 +56,8 @@ function tasks_macro(forex) end q = if isgiven(settings.reducer) quote + $setup_onlyone_blocks + $setup_onebyone_blocks $make_mapping_function tmapreduce(mapping_function, $(settings.reducer), $(itrng)) @@ -46,12 +65,16 @@ function tasks_macro(forex) elseif isgiven(settings.collect) maybe_warn_useless_init(settings) quote + $setup_onlyone_blocks + $setup_onebyone_blocks $make_mapping_function tmap(mapping_function, $(itrng)) end else maybe_warn_useless_init(settings) quote + $setup_onlyone_blocks + $setup_onebyone_blocks $make_mapping_function tforeach(mapping_function, $(itrng)) end @@ -68,7 +91,7 @@ function tasks_macro(forex) for (k, v) in settings.kwargs push!(kwexpr.args, Expr(:kw, k, v)) end - insert!(q.args[4].args, 2, kwexpr) + insert!(q.args[8].args, 2, kwexpr) # wrap everything in a let ... end block # and, potentially, define the `TaskLocalValue`s. @@ -151,16 +174,15 @@ function _atlocal_assign_to_exprs(ex) tls_type = esc(left_ex.args[2]) local_before = :($(tl_storage) = TaskLocalValue{$tls_type}(() -> $(tls_def))) else - tls_sym = esc(left_ex) + tls_sym = esc(left_ex) local_before = :($(tl_storage) = let f = () -> $(tls_def) - TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f) - end) + TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f) + end) end local_name = :($(tls_sym)) return local_before, local_name end - function _maybe_handle_atset_block!(settings, args) idcs = findall(args) do arg arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set") @@ -201,3 +223,43 @@ function _handle_atset_single_assign!(settings, ex) push!(settings.kwargs, sym => esc(def)) end end + +function _maybe_handle_atonlyone_blocks!(args) + idcs = findall(args) do arg + arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@only_one") + end + isnothing(idcs) && return # no @only_one blocks + setup_onlyone_blocks = quote end + for i in idcs + body = args[i].args[3] + @gensym onlyone + init_onlyone_ex = :($(onlyone) = $(OnlyOneRegion())) + push!(setup_onlyone_blocks.args, init_onlyone_ex) + args[i] = quote + Tools.try_enter!($(onlyone)) do + $(esc(body)) + end + end + end + return setup_onlyone_blocks +end + +function _maybe_handle_atonebyone_blocks!(args) + idcs = findall(args) do arg + arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@one_by_one") + end + isnothing(idcs) && return # no @one_by_one blocks + setup_onebyone_blocks = quote end + for i in idcs + body = args[i].args[3] + @gensym onebyone + init_lock_ex = :($(onebyone) = $(Base.ReentrantLock())) + push!(setup_onebyone_blocks.args, init_lock_ex) + args[i] = quote + $(esc(:lock))($(onebyone)) do + $(esc(body)) + end + end + end + return setup_onebyone_blocks +end diff --git a/src/macros.jl b/src/macros.jl index 2c868144..1a51ade9 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -151,3 +151,58 @@ end error("The @local macro may only be used inside of a @tasks block.") end end + +""" + @only_one begin ... end + +This can be used inside a `@tasks for ... end` block to mark a region of code to be +executed by only one of the parallel tasks (all other tasks skip over this region). + +## Example + +```julia +using OhMyThreads: @tasks + +@tasks for i in 1:10 + @set ntasks = 10 + + println(i, ": before") + @only_one begin + println(i, ": only printed by a single task") + sleep(1) + end + println(i, ": after") +end +``` +""" +macro only_one(args...) + error("The @only_one macro may only be used inside of a @tasks block.") +end + +""" + @one_by_one begin ... end + +This can be used inside a `@tasks for ... end` block to mark a region of code to be +executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary +and non-deterministic. + +## Example + +```julia +using OhMyThreads: @tasks + +@tasks for i in 1:10 + @set ntasks = 10 + + println(i, ": before") + @one_by_one begin + println(i, ": one task at a time") + sleep(0.5) + end + println(i, ": after") +end +``` +""" +macro one_by_one(args...) + error("The @one_by_one macro may only be used inside of a @tasks block.") +end diff --git a/src/tools.jl b/src/tools.jl index 0e8983e0..d5663b1e 100644 --- a/src/tools.jl +++ b/src/tools.jl @@ -24,4 +24,62 @@ Return a `UInt` identifier for the current running [Task](https://docs.julialang """ taskid() = objectid(current_task()) +""" +May be used to mark a region in parallel code to be executed by a single task only +(all other tasks shall skip over it). + +See [`try_enter!`](@ref) and [`reset!`](@ref). +""" +mutable struct OnlyOneRegion + @atomic latch::Bool + OnlyOneRegion() = new(false) +end + +""" + try_enter!(f, s::OnlyOneRegion) + +When called from multiple parallel tasks (on a shared `s::OnlyOneRegion`) only a single +task will execute `f`. + +## Example + +```julia +using OhMyThreads: @tasks +using OhMyThreads.Tools: OnlyOneRegion, try_enter! + +only_one = OnlyOneRegion() + +@tasks for i in 1:10 + @set ntasks = 10 + + println(i, ": before") + try_enter!(only_one) do + println(i, ": only printed by a single task") + sleep(1) + end + println(i, ": after") +end +``` +""" +function try_enter!(f, s::OnlyOneRegion) + latch = @atomic :monotonic s.latch + if latch + return + end + (_, success) = @atomicreplace s.latch false=>true + if !success + return + end + f() + return +end + +""" +Reset the `OnlyOneRegion` (so that it can be used again). +""" +function reset!(s::OnlyOneRegion) + @atomicreplace s.latch true=>false + nothing +end + end # Tools diff --git a/test/runtests.jl b/test/runtests.jl index 973a1355..9530c138 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -10,19 +10,21 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +, itrs = ([1 => "a", 2 => "b", 3 => "c", 4 => "d", 5 => "e"],), init = "")] -ChunkedGreedy(;kwargs...) = GreedyScheduler(;kwargs...) +ChunkedGreedy(; kwargs...) = GreedyScheduler(; kwargs...) @testset "Basics" begin for (; ~, f, op, itrs, init) in sets_to_test @testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin @testset for sched in ( - StaticScheduler, DynamicScheduler, GreedyScheduler, DynamicScheduler{OhMyThreads.Schedulers.NoChunking}, SerialScheduler, ChunkedGreedy) + StaticScheduler, DynamicScheduler, GreedyScheduler, + DynamicScheduler{OhMyThreads.Schedulers.NoChunking}, + SerialScheduler, ChunkedGreedy) @testset for split in (:batch, :scatter) for nchunks in (1, 2, 6) if sched == GreedyScheduler scheduler = sched(; ntasks = nchunks) elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking} - scheduler = DynamicScheduler(; chunking=false) + scheduler = DynamicScheduler(; chunking = false) elseif sched == SerialScheduler scheduler = SerialScheduler() else @@ -30,7 +32,8 @@ ChunkedGreedy(;kwargs...) = GreedyScheduler(;kwargs...) end kwargs = (; scheduler) - if (split == :scatter || sched ∈ (GreedyScheduler, ChunkedGreedy)) || op ∉ (vcat, *) + if (split == :scatter || + sched ∈ (GreedyScheduler, ChunkedGreedy)) || op ∉ (vcat, *) # scatter and greedy only works for commutative operators! else mapreduce_f_op_itr = mapreduce(f, op, itrs...) @@ -66,7 +69,8 @@ end; @testset "ChunkSplitters.Chunk" begin x = rand(100) chnks = OhMyThreads.chunks(x; n = Threads.nthreads()) - for scheduler in (DynamicScheduler(; chunking=false), StaticScheduler(; chunking=false)) + for scheduler in ( + DynamicScheduler(; chunking = false), StaticScheduler(; chunking = false)) @testset "$scheduler" begin @test tmap(x -> sin.(x), chnks; scheduler) ≈ map(x -> sin.(x), chnks) @test tmapreduce(x -> sin.(x), vcat, chnks; scheduler) ≈ @@ -86,106 +90,106 @@ end; # reduction @test @tasks(for i in 1:3 - @set reducer=(+) + @set reducer = (+) i end) == 6 # scheduler settings for sched in (StaticScheduler(), DynamicScheduler(), GreedyScheduler()) @test @tasks(for i in 1:3 - @set scheduler=sched + @set scheduler = sched i end) |> isnothing end # scheduler settings as symbols @test @tasks(for i in 1:3 - @set scheduler=:static + @set scheduler = :static i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic + @set scheduler = :dynamic i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:greedy + @set scheduler = :greedy i end) |> isnothing # @set begin ... end @test @tasks(for i in 1:10 @set begin - scheduler=StaticScheduler() - reducer=(+) + scheduler = StaticScheduler() + reducer = (+) end i end) == 55 # multiple @set @test @tasks(for i in 1:10 - @set scheduler=StaticScheduler() + @set scheduler = StaticScheduler() i - @set reducer=(+) + @set reducer = (+) end) == 55 # @set init @test @tasks(for i in 1:10 @set begin - reducer=(+) - init=0.0 + reducer = (+) + init = 0.0 end i end) === 55.0 @test @tasks(for i in 1:10 @set begin - reducer=(+) - init=0.0*im + reducer = (+) + init = 0.0 * im end i end) === (55.0 + 0.0im) # top-level "kwargs" @test @tasks(for i in 1:3 - @set scheduler=:static - @set ntasks=1 + @set scheduler = :static + @set ntasks = 1 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:static - @set nchunks=2 + @set scheduler = :static + @set nchunks = 2 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic - @set chunksize=2 + @set scheduler = :dynamic + @set chunksize = 2 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic - @set chunking=false + @set scheduler = :dynamic + @set chunking = false i end) |> isnothing @test_throws ArgumentError @tasks(for i in 1:3 - @set scheduler=DynamicScheduler() - @set chunking=false + @set scheduler = DynamicScheduler() + @set chunking = false i end) @test_throws MethodError @tasks(for i in 1:3 - @set scheduler=:serial - @set chunking=false + @set scheduler = :serial + @set chunking = false i end) @test_throws MethodError @tasks(for i in 1:3 - @set scheduler=:dynamic - @set asd=123 + @set scheduler = :dynamic + @set asd = 123 i end) # TaskLocalValue - ntd = 2*Threads.nthreads() + ntd = 2 * Threads.nthreads() ptrs = Vector{Ptr{Nothing}}(undef, ntd) tids = Vector{UInt64}(undef, ntd) tid() = OhMyThreads.Tools.taskid() @test @tasks(for i in 1:ntd @local C::Vector{Float64} = rand(3) - @set scheduler=:static + @set scheduler = :static ptrs[i] = pointer_from_objref(C) tids[i] = tid() end) |> isnothing @@ -217,17 +221,17 @@ end; sched = StaticScheduler() sched_sym = :static data = rand(10) - red = (a,b) -> a+b + red = (a, b) -> a + b n = 2 @test @tasks(for d in data - @set scheduler=sched - @set reducer=red + @set scheduler = sched + @set reducer = red var * d end) ≈ var * sum(data) @test @tasks(for d in data - @set scheduler=sched_sym - @set ntasks=n - @set reducer=red + @set scheduler = sched_sym + @set ntasks = n + @set reducer = red var * d end) ≈ var * sum(data) @@ -236,13 +240,14 @@ end; end @test @tasks(for _ in 1:10 @local C = SingleInt(var) - @set reducer=+ + @set reducer = + C.x - end) == 10*var + end) == 10 * var end; @testset "WithTaskLocals" begin - let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) + let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), + y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) # Equivalent to # function f() # x[][] += 1 @@ -259,7 +264,7 @@ end; # Make sure we can call `f` like a regular function @test f() == (1, 1) @test f() == (2, 2) - @test @fetch( f() ) == (1, 1) + @test @fetch(f()) == (1, 1) # Acceptable use of promise_task_local @test @fetch(promise_task_local(f)()) == (1, 1) # Acceptable use of promise_task_local @@ -283,23 +288,32 @@ end; @testset "chunking mode + chunksize option" begin for sched in (DynamicScheduler, StaticScheduler) @test sched() isa sched - @test sched(; chunksize=2) isa sched - - @test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount - @test OhMyThreads.Schedulers.chunking_mode(sched(; chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2, chunksize=4, chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=-2, chunksize=-4, split=:whatever, chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=-2, chunksize=-4, chunking=false)) == false - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2, chunksize=4, chunking=false)) == false - - @test_throws ArgumentError sched(; nchunks=2, chunksize=3) - @test_throws ArgumentError sched(; nchunks=0, chunksize=0) - @test_throws ArgumentError sched(; nchunks=-2, chunksize=-3) - - let scheduler = sched(; chunksize=2) + @test sched(; chunksize = 2) isa sched + + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize = 2)) == + OhMyThreads.Schedulers.FixedSize + @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks = 2)) == + OhMyThreads.Schedulers.FixedCount + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; + nchunks = 2, chunksize = 4, chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; + nchunks = -2, chunksize = -4, split = :whatever, chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize = 2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks = 2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; + nchunks = -2, chunksize = -4, chunking = false)) == false + @test OhMyThreads.Schedulers.chunking_enabled(sched(; + nchunks = 2, chunksize = 4, chunking = false)) == false + + @test_throws ArgumentError sched(; nchunks = 2, chunksize = 3) + @test_throws ArgumentError sched(; nchunks = 0, chunksize = 0) + @test_throws ArgumentError sched(; nchunks = -2, chunksize = -3) + + let scheduler = sched(; chunksize = 2) @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) @test isnothing(tforeach(sin, 1:10; scheduler)) @@ -312,41 +326,46 @@ end; res_tmr = mapreduce(sin, +, 1:10000) # scheduler not given - @test tmapreduce(sin, +, 1:10000; ntasks=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; nchunks=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; split=:scatter) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunksize=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunking=false) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; split = :scatter) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking = false) ≈ res_tmr # scheduler isa Scheduler - @test tmapreduce(sin, +, 1:10000; scheduler=StaticScheduler()) ≈ res_tmr - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=DynamicScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=DynamicScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; split=:scatter, scheduler=StaticScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=SerialScheduler()) + @test tmapreduce(sin, +, 1:10000; scheduler = StaticScheduler()) ≈ res_tmr + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 2, scheduler = DynamicScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; chunksize = 2, scheduler = DynamicScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; split = :scatter, scheduler = StaticScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 3, scheduler = SerialScheduler()) # scheduler isa Symbol for s in (:dynamic, :static, :serial, :greedy) - @test tmapreduce(sin, +, 1:10000; scheduler=s, init=0.0) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; scheduler = s, init = 0.0) ≈ res_tmr end for s in (:dynamic, :static, :greedy) - @test tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=s, init=0.0) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 2, scheduler = s, init = 0.0) ≈ res_tmr end for s in (:dynamic, :static) - @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunking=false, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; nchunks=3, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=s) ≈ res_tmr - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, nchunks=2, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize = 2, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking = false, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks = 3, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 3, scheduler = s) ≈ res_tmr + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 3, nchunks = 2, scheduler = s)≈res_tmr end end; @testset "empty collections" begin for empty_coll in (11:9, Float64[]) - for f in (sin, x->im*x, identity) + for f in (sin, x -> im * x, identity) for op in (+, *, min) @test_throws ArgumentError tmapreduce(f, op, empty_coll) - for init in (0.0, 0, 0.0*im, 0f0) + for init in (0.0, 0, 0.0 * im, 0.0f0) @test tmapreduce(f, op, empty_coll; init) == init end @test tforeach(f, empty_coll) |> isnothing @@ -355,4 +374,105 @@ end; end end; +# for testing @one_by_one region +mutable struct SingleAccessOnly + in_use::Bool + const lck::ReentrantLock + SingleAccessOnly() = new(false, ReentrantLock()) +end +function acquire(f, o::SingleAccessOnly) + lock(o.lck) do + o.in_use && throw(ErrorException("Already in use!")) + o.in_use = true + end + try + f() + finally + lock(o.lck) do + !o.in_use && throw(ErrorException("Conflict!")) + o.in_use = false + end + end +end + +@testset "regions" begin + @testset "@one_by_one" begin + sao = SingleAccessOnly() + try + @tasks for i in 1:10 + @set ntasks = 10 + @one_by_one begin + acquire(sao) do + sleep(0.01) + end + end + end + catch ErrorException + @test false + else + @test true + end + + # test escaping + x = 0 + y = 0 + sao = SingleAccessOnly() + try + @tasks for i in 1:10 + @set ntasks = 10 + + y += 1 # not safe (race condition) + @one_by_one begin + x += 1 # parallel-safe because inside of one_by_one region + acquire(sao) do + sleep(0.01) + end + end + end + @test x == 10 + catch ErrorException + @test false + end + end + + @testset "@only_one" begin + x = 0 + y = 0 + try + @tasks for i in 1:10 + @set ntasks = 10 + + y += 1 # not safe (race condition) + @only_one begin + x += 1 # parallel-safe because only a single task will execute this + end + end + @test x == 1 # only a single task should have incremented x + catch ErrorException + @test false + end + end + + @testset "@only_one + @one_by_one" begin + x = 0 + y = 0 + try + @tasks for i in 1:10 + @set ntasks = 10 + + @only_one begin + x += 1 # parallel-safe + end + + @one_by_one begin + y += 1 # parallel-safe + end + end + @test x == 1 && y == 10 + catch ErrorException + @test false + end + end +end; + # Todo way more testing, and easier tests to deal with