From 37837abe9eebcca648903475d85e8ecd4238ab25 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Mon, 18 Mar 2024 18:23:33 +0100 Subject: [PATCH 1/7] critical section --- CHANGELOG.md | 4 + docs/src/refs/api.md | 1 + src/OhMyThreads.jl | 2 +- src/macro_impl.jl | 42 ++++++++- src/macros.jl | 32 +++++++ test/runtests.jl | 214 +++++++++++++++++++++++++++---------------- 6 files changed, 212 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 466b3226..f4b0a3d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ OhMyThreads.jl Changelog ========================= +Version 0.5.1 +------------- +- ![Feature][badge-feature] Within a `@tasks` block one can now mark a section as "critical" via `@section :critical begin ... end`. + Version 0.5.0 ------------- diff --git a/docs/src/refs/api.md b/docs/src/refs/api.md index 2e7b494a..bb24161f 100644 --- a/docs/src/refs/api.md +++ b/docs/src/refs/api.md @@ -11,6 +11,7 @@ CollapsedDocStrings = true @tasks @set @local +@section ``` ### Functions diff --git a/src/OhMyThreads.jl b/src/OhMyThreads.jl index cf8da9a4..39cf8faa 100644 --- a/src/OhMyThreads.jl +++ b/src/OhMyThreads.jl @@ -20,7 +20,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler SerialScheduler include("implementation.jl") -export @tasks, @set, @local +export @tasks, @set, @local, @section export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler diff --git a/src/macro_impl.jl b/src/macro_impl.jl index b4e74a58..235e0f76 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -20,6 +20,7 @@ function tasks_macro(forex) locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args) tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before) _maybe_handle_atset_block!(settings, forbody.args) + setup_sections = _maybe_handle_atsection_blocks!(forbody.args) forbody = esc(forbody) itrng = esc(itrng) @@ -39,6 +40,7 @@ function tasks_macro(forex) end q = if isgiven(settings.reducer) quote + $setup_sections $make_mapping_function tmapreduce(mapping_function, $(settings.reducer), $(itrng)) @@ -46,12 +48,14 @@ function tasks_macro(forex) elseif isgiven(settings.collect) maybe_warn_useless_init(settings) quote + $setup_sections $make_mapping_function tmap(mapping_function, $(itrng)) end else maybe_warn_useless_init(settings) quote + $setup_sections $make_mapping_function tforeach(mapping_function, $(itrng)) end @@ -68,7 +72,7 @@ function tasks_macro(forex) for (k, v) in settings.kwargs push!(kwexpr.args, Expr(:kw, k, v)) end - insert!(q.args[4].args, 2, kwexpr) + insert!(q.args[6].args, 2, kwexpr) # wrap everything in a let ... end block # and, potentially, define the `TaskLocalValue`s. @@ -151,16 +155,15 @@ function _atlocal_assign_to_exprs(ex) tls_type = esc(left_ex.args[2]) local_before = :($(tl_storage) = TaskLocalValue{$tls_type}(() -> $(tls_def))) else - tls_sym = esc(left_ex) + tls_sym = esc(left_ex) local_before = :($(tl_storage) = let f = () -> $(tls_def) - TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f) - end) + TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f) + end) end local_name = :($(tls_sym)) return local_before, local_name end - function _maybe_handle_atset_block!(settings, args) idcs = findall(args) do arg arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set") @@ -201,3 +204,32 @@ function _handle_atset_single_assign!(settings, ex) push!(settings.kwargs, sym => esc(def)) end end + +function _maybe_handle_atsection_blocks!(args) + idcs = findall(args) do arg + arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@section") + end + isnothing(idcs) && return # no @section blocks + setup_sections = quote end + for i in idcs + kind = args[i].args[3] + body = args[i].args[4] + if kind isa QuoteNode + if kind.value == :critical + @gensym critical_lock + init_lock_ex = esc(:($(critical_lock) = $(ReentrantLock()))) + push!(setup_sections.args, init_lock_ex) + args[i] = quote + lock($(critical_lock)) do + $(body) + end + end + else + throw(ErrorException("Unknown section kind :$(kind.value).")) + end + else + throw(ErrorException("Wrong usage of @section. Must be followed by a symbol, indicating the kind of section.")) + end + end + return setup_sections +end diff --git a/src/macros.jl b/src/macros.jl index 2c868144..844bed1f 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -151,3 +151,35 @@ end error("The @local macro may only be used inside of a @tasks block.") end end + +""" + @section kind begin ... end + +This can be used inside a `@tasks for ... end` block to identify a section of code that +must be executed according to a specific synchronization policy +(as indicated by the symbol `kind`, see below). + +Multiple `@section` blocks are supported. + +## Kinds + +* `:critical`: Section of code that must be executed by a single task at a time (arbitrary order). + +## Example + +```julia +@tasks for i in 1:10 + @set ntasks = 10 + + println(i, ": before") + @section :critical begin + println(i, ": critical") + sleep(1) + end + println(i, ": after") +end +``` +""" +macro section(args...) + error("The @section macro may only be used inside of a @tasks block.") +end diff --git a/test/runtests.jl b/test/runtests.jl index 973a1355..4fe17be4 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -10,19 +10,21 @@ sets_to_test = [(~ = isapprox, f = sin ∘ *, op = +, itrs = ([1 => "a", 2 => "b", 3 => "c", 4 => "d", 5 => "e"],), init = "")] -ChunkedGreedy(;kwargs...) = GreedyScheduler(;kwargs...) +ChunkedGreedy(; kwargs...) = GreedyScheduler(; kwargs...) @testset "Basics" begin for (; ~, f, op, itrs, init) in sets_to_test @testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin @testset for sched in ( - StaticScheduler, DynamicScheduler, GreedyScheduler, DynamicScheduler{OhMyThreads.Schedulers.NoChunking}, SerialScheduler, ChunkedGreedy) + StaticScheduler, DynamicScheduler, GreedyScheduler, + DynamicScheduler{OhMyThreads.Schedulers.NoChunking}, + SerialScheduler, ChunkedGreedy) @testset for split in (:batch, :scatter) for nchunks in (1, 2, 6) if sched == GreedyScheduler scheduler = sched(; ntasks = nchunks) elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking} - scheduler = DynamicScheduler(; chunking=false) + scheduler = DynamicScheduler(; chunking = false) elseif sched == SerialScheduler scheduler = SerialScheduler() else @@ -30,7 +32,8 @@ ChunkedGreedy(;kwargs...) = GreedyScheduler(;kwargs...) end kwargs = (; scheduler) - if (split == :scatter || sched ∈ (GreedyScheduler, ChunkedGreedy)) || op ∉ (vcat, *) + if (split == :scatter || + sched ∈ (GreedyScheduler, ChunkedGreedy)) || op ∉ (vcat, *) # scatter and greedy only works for commutative operators! else mapreduce_f_op_itr = mapreduce(f, op, itrs...) @@ -66,7 +69,8 @@ end; @testset "ChunkSplitters.Chunk" begin x = rand(100) chnks = OhMyThreads.chunks(x; n = Threads.nthreads()) - for scheduler in (DynamicScheduler(; chunking=false), StaticScheduler(; chunking=false)) + for scheduler in ( + DynamicScheduler(; chunking = false), StaticScheduler(; chunking = false)) @testset "$scheduler" begin @test tmap(x -> sin.(x), chnks; scheduler) ≈ map(x -> sin.(x), chnks) @test tmapreduce(x -> sin.(x), vcat, chnks; scheduler) ≈ @@ -86,106 +90,106 @@ end; # reduction @test @tasks(for i in 1:3 - @set reducer=(+) + @set reducer = (+) i end) == 6 # scheduler settings for sched in (StaticScheduler(), DynamicScheduler(), GreedyScheduler()) @test @tasks(for i in 1:3 - @set scheduler=sched + @set scheduler = sched i end) |> isnothing end # scheduler settings as symbols @test @tasks(for i in 1:3 - @set scheduler=:static + @set scheduler = :static i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic + @set scheduler = :dynamic i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:greedy + @set scheduler = :greedy i end) |> isnothing # @set begin ... end @test @tasks(for i in 1:10 @set begin - scheduler=StaticScheduler() - reducer=(+) + scheduler = StaticScheduler() + reducer = (+) end i end) == 55 # multiple @set @test @tasks(for i in 1:10 - @set scheduler=StaticScheduler() + @set scheduler = StaticScheduler() i - @set reducer=(+) + @set reducer = (+) end) == 55 # @set init @test @tasks(for i in 1:10 @set begin - reducer=(+) - init=0.0 + reducer = (+) + init = 0.0 end i end) === 55.0 @test @tasks(for i in 1:10 @set begin - reducer=(+) - init=0.0*im + reducer = (+) + init = 0.0 * im end i end) === (55.0 + 0.0im) # top-level "kwargs" @test @tasks(for i in 1:3 - @set scheduler=:static - @set ntasks=1 + @set scheduler = :static + @set ntasks = 1 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:static - @set nchunks=2 + @set scheduler = :static + @set nchunks = 2 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic - @set chunksize=2 + @set scheduler = :dynamic + @set chunksize = 2 i end) |> isnothing @test @tasks(for i in 1:3 - @set scheduler=:dynamic - @set chunking=false + @set scheduler = :dynamic + @set chunking = false i end) |> isnothing @test_throws ArgumentError @tasks(for i in 1:3 - @set scheduler=DynamicScheduler() - @set chunking=false + @set scheduler = DynamicScheduler() + @set chunking = false i end) @test_throws MethodError @tasks(for i in 1:3 - @set scheduler=:serial - @set chunking=false + @set scheduler = :serial + @set chunking = false i end) @test_throws MethodError @tasks(for i in 1:3 - @set scheduler=:dynamic - @set asd=123 + @set scheduler = :dynamic + @set asd = 123 i end) # TaskLocalValue - ntd = 2*Threads.nthreads() + ntd = 2 * Threads.nthreads() ptrs = Vector{Ptr{Nothing}}(undef, ntd) tids = Vector{UInt64}(undef, ntd) tid() = OhMyThreads.Tools.taskid() @test @tasks(for i in 1:ntd @local C::Vector{Float64} = rand(3) - @set scheduler=:static + @set scheduler = :static ptrs[i] = pointer_from_objref(C) tids[i] = tid() end) |> isnothing @@ -217,17 +221,17 @@ end; sched = StaticScheduler() sched_sym = :static data = rand(10) - red = (a,b) -> a+b + red = (a, b) -> a + b n = 2 @test @tasks(for d in data - @set scheduler=sched - @set reducer=red + @set scheduler = sched + @set reducer = red var * d end) ≈ var * sum(data) @test @tasks(for d in data - @set scheduler=sched_sym - @set ntasks=n - @set reducer=red + @set scheduler = sched_sym + @set ntasks = n + @set reducer = red var * d end) ≈ var * sum(data) @@ -236,13 +240,14 @@ end; end @test @tasks(for _ in 1:10 @local C = SingleInt(var) - @set reducer=+ + @set reducer = + C.x - end) == 10*var + end) == 10 * var end; @testset "WithTaskLocals" begin - let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) + let x = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)), + y = TaskLocalValue{Base.RefValue{Int}}(() -> Ref{Int}(0)) # Equivalent to # function f() # x[][] += 1 @@ -259,7 +264,7 @@ end; # Make sure we can call `f` like a regular function @test f() == (1, 1) @test f() == (2, 2) - @test @fetch( f() ) == (1, 1) + @test @fetch(f()) == (1, 1) # Acceptable use of promise_task_local @test @fetch(promise_task_local(f)()) == (1, 1) # Acceptable use of promise_task_local @@ -283,23 +288,32 @@ end; @testset "chunking mode + chunksize option" begin for sched in (DynamicScheduler, StaticScheduler) @test sched() isa sched - @test sched(; chunksize=2) isa sched + @test sched(; chunksize = 2) isa sched - @test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize=2)) == OhMyThreads.Schedulers.FixedSize - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2)) == OhMyThreads.Schedulers.FixedCount - @test OhMyThreads.Schedulers.chunking_mode(sched(; chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=2, chunksize=4, chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks=-2, chunksize=-4, split=:whatever, chunking=false)) == OhMyThreads.Schedulers.NoChunking - @test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2)) == true - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=-2, chunksize=-4, chunking=false)) == false - @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks=2, chunksize=4, chunking=false)) == false + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunksize = 2)) == + OhMyThreads.Schedulers.FixedSize + @test OhMyThreads.Schedulers.chunking_mode(sched(; nchunks = 2)) == + OhMyThreads.Schedulers.FixedCount + @test OhMyThreads.Schedulers.chunking_mode(sched(; chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; + nchunks = 2, chunksize = 4, chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_mode(sched(; + nchunks = -2, chunksize = -4, split = :whatever, chunking = false)) == + OhMyThreads.Schedulers.NoChunking + @test OhMyThreads.Schedulers.chunking_enabled(sched(; chunksize = 2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; nchunks = 2)) == true + @test OhMyThreads.Schedulers.chunking_enabled(sched(; + nchunks = -2, chunksize = -4, chunking = false)) == false + @test OhMyThreads.Schedulers.chunking_enabled(sched(; + nchunks = 2, chunksize = 4, chunking = false)) == false - @test_throws ArgumentError sched(; nchunks=2, chunksize=3) - @test_throws ArgumentError sched(; nchunks=0, chunksize=0) - @test_throws ArgumentError sched(; nchunks=-2, chunksize=-3) + @test_throws ArgumentError sched(; nchunks = 2, chunksize = 3) + @test_throws ArgumentError sched(; nchunks = 0, chunksize = 0) + @test_throws ArgumentError sched(; nchunks = -2, chunksize = -3) - let scheduler = sched(; chunksize=2) + let scheduler = sched(; chunksize = 2) @test tmapreduce(sin, +, 1:10; scheduler) ≈ mapreduce(sin, +, 1:10) @test tmap(sin, 1:10; scheduler) ≈ map(sin, 1:10) @test isnothing(tforeach(sin, 1:10; scheduler)) @@ -312,41 +326,46 @@ end; res_tmr = mapreduce(sin, +, 1:10000) # scheduler not given - @test tmapreduce(sin, +, 1:10000; ntasks=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; nchunks=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; split=:scatter) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunksize=2) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunking=false) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; split = :scatter) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize = 2) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking = false) ≈ res_tmr # scheduler isa Scheduler - @test tmapreduce(sin, +, 1:10000; scheduler=StaticScheduler()) ≈ res_tmr - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=DynamicScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=DynamicScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; split=:scatter, scheduler=StaticScheduler()) - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=SerialScheduler()) + @test tmapreduce(sin, +, 1:10000; scheduler = StaticScheduler()) ≈ res_tmr + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 2, scheduler = DynamicScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; chunksize = 2, scheduler = DynamicScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; split = :scatter, scheduler = StaticScheduler()) + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 3, scheduler = SerialScheduler()) # scheduler isa Symbol for s in (:dynamic, :static, :serial, :greedy) - @test tmapreduce(sin, +, 1:10000; scheduler=s, init=0.0) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; scheduler = s, init = 0.0) ≈ res_tmr end for s in (:dynamic, :static, :greedy) - @test tmapreduce(sin, +, 1:10000; ntasks=2, scheduler=s, init=0.0) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 2, scheduler = s, init = 0.0) ≈ res_tmr end for s in (:dynamic, :static) - @test tmapreduce(sin, +, 1:10000; chunksize=2, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; chunking=false, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; nchunks=3, scheduler=s) ≈ res_tmr - @test tmapreduce(sin, +, 1:10000; ntasks=3, scheduler=s) ≈ res_tmr - @test_throws ArgumentError tmapreduce(sin, +, 1:10000; ntasks=3, nchunks=2, scheduler=s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunksize = 2, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; chunking = false, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; nchunks = 3, scheduler = s) ≈ res_tmr + @test tmapreduce(sin, +, 1:10000; ntasks = 3, scheduler = s) ≈ res_tmr + @test_throws ArgumentError tmapreduce( + sin, +, 1:10000; ntasks = 3, nchunks = 2, scheduler = s)≈res_tmr end end; @testset "empty collections" begin for empty_coll in (11:9, Float64[]) - for f in (sin, x->im*x, identity) + for f in (sin, x -> im * x, identity) for op in (+, *, min) @test_throws ArgumentError tmapreduce(f, op, empty_coll) - for init in (0.0, 0, 0.0*im, 0f0) + for init in (0.0, 0, 0.0 * im, 0.0f0) @test tmapreduce(f, op, empty_coll; init) == init end @test tforeach(f, empty_coll) |> isnothing @@ -355,4 +374,45 @@ end; end end; +# for testing @section :critical +mutable struct SingleAccessOnly + in_use::Bool + const lck::ReentrantLock + SingleAccessOnly() = new(false, ReentrantLock()) +end +function acquire(f, o::SingleAccessOnly) + lock(o.lck) do + o.in_use && throw(ErrorException("Already in use!")) + o.in_use = true + end + try + f() + finally + lock(o.lck) do + !o.in_use && throw(ErrorException("Conflict!")) + o.in_use = false + end + end +end + +@testset "sections" begin + @testset ":critical" begin + sao = SingleAccessOnly() + try + @tasks for i in 1:10 + @set ntasks = 10 + @section :critical begin + acquire(sao) do + sleep(0.01) + end + end + end + catch ErrorException + @test false + else + @test true + end + end +end; + # Todo way more testing, and easier tests to deal with From ce27d2d690ca3f22e01965b9f8e1b05f21411246 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Mon, 18 Mar 2024 18:31:57 +0100 Subject: [PATCH 2/7] minor docstring update --- src/macros.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/macros.jl b/src/macros.jl index 844bed1f..7c331aa8 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -173,7 +173,7 @@ Multiple `@section` blocks are supported. println(i, ": before") @section :critical begin - println(i, ": critical") + println(i, ": one task at a time") sleep(1) end println(i, ": after") From 01deac57b3d9d9739893d2d5671119317788e6be Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Mon, 18 Mar 2024 19:33:56 +0100 Subject: [PATCH 3/7] section single; overhaul of forbody escaping --- CHANGELOG.md | 3 ++- src/macro_impl.jl | 31 +++++++++++++++++++++++++++---- src/macros.jl | 14 ++++++++++++++ src/tools.jl | 44 ++++++++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 39 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 126 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f4b0a3d8..f1712ce1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ OhMyThreads.jl Changelog Version 0.5.1 ------------- -- ![Feature][badge-feature] Within a `@tasks` block one can now mark a section as "critical" via `@section :critical begin ... end`. +- ![Feature][badge-feature] Within a `@tasks` block one can now mark a section as "critical" via `@section :critical begin ... end`. This section will be run by one task at a time. +- ![Feature][badge-feature] Within a `@tasks` block one can now mark a section as "single" via `@section :single begin ... end`. This section will be run by a single task only. Version 0.5.0 ------------- diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 235e0f76..97d46c95 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -1,3 +1,5 @@ +using OhMyThreads.Tools: SectionSingle, try_enter + function tasks_macro(forex) if forex.head != :for throw(ErrorException("Expected a for loop after `@tasks`.")) @@ -17,12 +19,22 @@ function tasks_macro(forex) settings = Settings() + # Escape everything in the loop body that is not used in conjuction with one of our + # "macros", e.g. @set or @local. Code inside of these macro blocks will be escaped by + # the respective "macro" handling functions below. + for i in findall(forbody.args) do arg + !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")) && + !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@local")) && + !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@section")) + end + forbody.args[i] = esc(forbody.args[i]) + end + locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args) tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before) _maybe_handle_atset_block!(settings, forbody.args) setup_sections = _maybe_handle_atsection_blocks!(forbody.args) - forbody = esc(forbody) itrng = esc(itrng) itvar = esc(itvar) @@ -217,11 +229,22 @@ function _maybe_handle_atsection_blocks!(args) if kind isa QuoteNode if kind.value == :critical @gensym critical_lock - init_lock_ex = esc(:($(critical_lock) = $(ReentrantLock()))) + init_lock_ex = :($(critical_lock) = $(Base.ReentrantLock())) + # init_lock_ex = esc(:($(critical_lock) = $(Base.ReentrantLock()))) push!(setup_sections.args, init_lock_ex) args[i] = quote - lock($(critical_lock)) do - $(body) + $(esc(:lock))($(critical_lock)) do + $(esc(body)) + end + end + elseif kind.value == :single + @gensym single_section + # init_single_section_ex = esc(:($(single_section) = $(SectionSingle()))) + init_single_section_ex = :($(single_section) = $(SectionSingle())) + push!(setup_sections.args, init_single_section_ex) + args[i] = quote + Tools.try_enter($(single_section)) do + $(esc(body)) end end else diff --git a/src/macros.jl b/src/macros.jl index 7c331aa8..46252829 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -164,6 +164,7 @@ Multiple `@section` blocks are supported. ## Kinds * `:critical`: Section of code that must be executed by a single task at a time (arbitrary order). +* `:single`: Section of code that must be executed by a single task only. All other tasks will skip over this section. ## Example @@ -179,6 +180,19 @@ Multiple `@section` blocks are supported. println(i, ": after") end ``` + +```julia +@tasks for i in 1:10 + @set ntasks = 10 + + println(i, ": before") + @section :single begin + println(i, ": only printed by a single task") + sleep(1) + end + println(i, ": after") +end +``` """ macro section(args...) error("The @section macro may only be used inside of a @tasks block.") diff --git a/src/tools.jl b/src/tools.jl index 0e8983e0..f6a1d0d4 100644 --- a/src/tools.jl +++ b/src/tools.jl @@ -24,4 +24,48 @@ Return a `UInt` identifier for the current running [Task](https://docs.julialang """ taskid() = objectid(current_task()) +""" +When `try_enter(s::SectionSingle) do ... end` is called from multiple parallel tasks only +a single task will run the content of the `do ... end` block. +""" +struct SectionSingle + first::Base.RefValue{Bool} + lck::ReentrantLock + SectionSingle() = new(Ref(true), ReentrantLock()) +end + +""" + try_enter(f, s::SectionSingle) + +When called from multiple parallel tasks (on a shared `s::SectionSingle`) only a single +task will execute `f`. Typical usage: + +```julia +using OhMyThreads.Tools: SectionSingle + +s = SectionSingle() + +@tasks for i in 1:10 + @set ntasks = 10 + + println(i, ": before") + try_enter(s) do + println(i, ": only printed by a single task") + sleep(1) + end + println(i, ": after") +end +``` +""" +function try_enter(f, s::SectionSingle) + run_f = false + lock(s.lck) do + if s.first[] + run_f = true # The first task to try_enter → run f + s.first[] = false + end + end + run_f && f() +end + end # Tools diff --git a/test/runtests.jl b/test/runtests.jl index 4fe17be4..3e867b6b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -412,6 +412,45 @@ end else @test true end + + # test escaping + x = 0 + y = 0 + sao = SingleAccessOnly() + try + @tasks for i in 1:10 + @set ntasks = 10 + + y += 1 # not safe (race condition) + @section :critical begin + x += 1 # parallel-safe because inside of critical section + acquire(sao) do + sleep(0.01) + end + end + end + @test x == 10 + catch ErrorException + @test false + end + end + + @testset ":single" begin + x = 0 + y = 0 + try + @tasks for i in 1:10 + @set ntasks = 10 + + y += 1 # not safe (race condition) + @section :single begin + x += 1 # parallel-safe because inside of single section + end + end + @test x == 1 # only a single task should have incremented x + catch ErrorException + @test false + end end end; From bddd985d3eb835c319c8c6650542a8a67cadfa59 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Mon, 18 Mar 2024 19:41:13 +0100 Subject: [PATCH 4/7] update docstring --- src/tools.jl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tools.jl b/src/tools.jl index f6a1d0d4..5e4079ed 100644 --- a/src/tools.jl +++ b/src/tools.jl @@ -25,8 +25,10 @@ Return a `UInt` identifier for the current running [Task](https://docs.julialang taskid() = objectid(current_task()) """ -When `try_enter(s::SectionSingle) do ... end` is called from multiple parallel tasks only -a single task will run the content of the `do ... end` block. +May be used to implement a "single" section in parallel code. This section will only be +run by a single task (other tasks will skip over it). + +See [`try_enter`](@ref). """ struct SectionSingle first::Base.RefValue{Bool} From fd327461bfadda8ce314207e0d1baa49c2ddc4a3 Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Mon, 18 Mar 2024 19:48:28 +0100 Subject: [PATCH 5/7] drop comments --- src/macro_impl.jl | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 97d46c95..96096bcc 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -230,7 +230,6 @@ function _maybe_handle_atsection_blocks!(args) if kind.value == :critical @gensym critical_lock init_lock_ex = :($(critical_lock) = $(Base.ReentrantLock())) - # init_lock_ex = esc(:($(critical_lock) = $(Base.ReentrantLock()))) push!(setup_sections.args, init_lock_ex) args[i] = quote $(esc(:lock))($(critical_lock)) do @@ -239,7 +238,6 @@ function _maybe_handle_atsection_blocks!(args) end elseif kind.value == :single @gensym single_section - # init_single_section_ex = esc(:($(single_section) = $(SectionSingle()))) init_single_section_ex = :($(single_section) = $(SectionSingle())) push!(setup_sections.args, init_single_section_ex) args[i] = quote From e614fc30f4e4c4a5cde62808ab0c968ad03e8a5a Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 19 Mar 2024 14:58:11 +0100 Subject: [PATCH 6/7] one_by_one and one_only --- CHANGELOG.md | 4 +-- docs/src/refs/api.md | 3 +- src/OhMyThreads.jl | 2 +- src/macro_impl.jl | 83 ++++++++++++++++++++++++-------------------- src/macros.jl | 45 ++++++++++++++---------- src/tools.jl | 54 +++++++++++++++++----------- test/runtests.jl | 39 ++++++++++++++++----- 7 files changed, 141 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1712ce1..d017c450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,8 @@ OhMyThreads.jl Changelog Version 0.5.1 ------------- -- ![Feature][badge-feature] Within a `@tasks` block one can now mark a section as "critical" via `@section :critical begin ... end`. This section will be run by one task at a time. -- ![Feature][badge-feature] Within a `@tasks` block one can now mark a section as "single" via `@section :single begin ... end`. This section will be run by a single task only. +- ![Feature][badge-feature] Within a parallel `@tasks` block one can now mark a region with `@one_by_one`. This region will be run by one task at a time ("critical region"). +- ![Feature][badge-feature] Within a `@tasks` block one can now mark a region as with `@one_only`. This region will be run by a single parallel task only (other tasks will skip over it). Version 0.5.0 ------------- diff --git a/docs/src/refs/api.md b/docs/src/refs/api.md index bb24161f..240a7dfb 100644 --- a/docs/src/refs/api.md +++ b/docs/src/refs/api.md @@ -11,7 +11,8 @@ CollapsedDocStrings = true @tasks @set @local -@section +@one_only +@one_by_one ``` ### Functions diff --git a/src/OhMyThreads.jl b/src/OhMyThreads.jl index 39cf8faa..dd2d2cba 100644 --- a/src/OhMyThreads.jl +++ b/src/OhMyThreads.jl @@ -20,7 +20,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler SerialScheduler include("implementation.jl") -export @tasks, @set, @local, @section +export @tasks, @set, @local, @one_by_one, @one_only export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 96096bcc..129c736e 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -1,4 +1,4 @@ -using OhMyThreads.Tools: SectionSingle, try_enter +using OhMyThreads.Tools: OneOnlyRegion, try_enter! function tasks_macro(forex) if forex.head != :for @@ -25,7 +25,10 @@ function tasks_macro(forex) for i in findall(forbody.args) do arg !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")) && !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@local")) && - !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@section")) + !(arg isa Expr && arg.head == :macrocall && + arg.args[1] == Symbol("@one_only")) && + !(arg isa Expr && arg.head == :macrocall && + arg.args[1] == Symbol("@one_by_one")) end forbody.args[i] = esc(forbody.args[i]) end @@ -33,7 +36,8 @@ function tasks_macro(forex) locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args) tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before) _maybe_handle_atset_block!(settings, forbody.args) - setup_sections = _maybe_handle_atsection_blocks!(forbody.args) + setup_oneonly_blocks = _maybe_handle_atoneonly_blocks!(forbody.args) + setup_onebyone_blocks = _maybe_handle_atonebyone_blocks!(forbody.args) itrng = esc(itrng) itvar = esc(itvar) @@ -52,7 +56,8 @@ function tasks_macro(forex) end q = if isgiven(settings.reducer) quote - $setup_sections + $setup_oneonly_blocks + $setup_onebyone_blocks $make_mapping_function tmapreduce(mapping_function, $(settings.reducer), $(itrng)) @@ -60,14 +65,16 @@ function tasks_macro(forex) elseif isgiven(settings.collect) maybe_warn_useless_init(settings) quote - $setup_sections + $setup_oneonly_blocks + $setup_onebyone_blocks $make_mapping_function tmap(mapping_function, $(itrng)) end else maybe_warn_useless_init(settings) quote - $setup_sections + $setup_oneonly_blocks + $setup_onebyone_blocks $make_mapping_function tforeach(mapping_function, $(itrng)) end @@ -84,7 +91,7 @@ function tasks_macro(forex) for (k, v) in settings.kwargs push!(kwexpr.args, Expr(:kw, k, v)) end - insert!(q.args[6].args, 2, kwexpr) + insert!(q.args[8].args, 2, kwexpr) # wrap everything in a let ... end block # and, potentially, define the `TaskLocalValue`s. @@ -217,40 +224,42 @@ function _handle_atset_single_assign!(settings, ex) end end -function _maybe_handle_atsection_blocks!(args) +function _maybe_handle_atoneonly_blocks!(args) idcs = findall(args) do arg - arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@section") + arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@one_only") end - isnothing(idcs) && return # no @section blocks - setup_sections = quote end + isnothing(idcs) && return # no @one_only blocks + setup_oneonly_blocks = quote end for i in idcs - kind = args[i].args[3] - body = args[i].args[4] - if kind isa QuoteNode - if kind.value == :critical - @gensym critical_lock - init_lock_ex = :($(critical_lock) = $(Base.ReentrantLock())) - push!(setup_sections.args, init_lock_ex) - args[i] = quote - $(esc(:lock))($(critical_lock)) do - $(esc(body)) - end - end - elseif kind.value == :single - @gensym single_section - init_single_section_ex = :($(single_section) = $(SectionSingle())) - push!(setup_sections.args, init_single_section_ex) - args[i] = quote - Tools.try_enter($(single_section)) do - $(esc(body)) - end - end - else - throw(ErrorException("Unknown section kind :$(kind.value).")) + body = args[i].args[3] + @gensym oneonly + init_oneonly_ex = :($(oneonly) = $(OneOnlyRegion())) + push!(setup_oneonly_blocks.args, init_oneonly_ex) + args[i] = quote + Tools.try_enter!($(oneonly)) do + $(esc(body)) + end + end + end + return setup_oneonly_blocks +end + +function _maybe_handle_atonebyone_blocks!(args) + idcs = findall(args) do arg + arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@one_by_one") + end + isnothing(idcs) && return # no @one_by_one blocks + setup_onebyone_blocks = quote end + for i in idcs + body = args[i].args[3] + @gensym onebyone + init_lock_ex = :($(onebyone) = $(Base.ReentrantLock())) + push!(setup_onebyone_blocks.args, init_lock_ex) + args[i] = quote + $(esc(:lock))($(onebyone)) do + $(esc(body)) end - else - throw(ErrorException("Wrong usage of @section. Must be followed by a symbol, indicating the kind of section.")) end end - return setup_sections + return setup_onebyone_blocks end diff --git a/src/macros.jl b/src/macros.jl index 46252829..66da213e 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -153,47 +153,56 @@ end end """ - @section kind begin ... end + @one_only begin ... end -This can be used inside a `@tasks for ... end` block to identify a section of code that -must be executed according to a specific synchronization policy -(as indicated by the symbol `kind`, see below). - -Multiple `@section` blocks are supported. - -## Kinds - -* `:critical`: Section of code that must be executed by a single task at a time (arbitrary order). -* `:single`: Section of code that must be executed by a single task only. All other tasks will skip over this section. +This can be used inside a `@tasks for ... end` block to mark a region of code to be +executed by only one of the parallel tasks (all other tasks skip over this region). ## Example ```julia +using OhMyThreads: @tasks + @tasks for i in 1:10 @set ntasks = 10 println(i, ": before") - @section :critical begin - println(i, ": one task at a time") + @one_only begin + println(i, ": only printed by a single task") sleep(1) end println(i, ": after") end ``` +""" +macro one_only(args...) + error("The @one_only macro may only be used inside of a @tasks block.") +end + +""" + @one_by_one begin ... end + +This can be used inside a `@tasks for ... end` block to mark a region of code to be +executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary +and non-deterministic. + +## Example ```julia +using OhMyThreads: @tasks + @tasks for i in 1:10 @set ntasks = 10 println(i, ": before") - @section :single begin - println(i, ": only printed by a single task") - sleep(1) + @one_by_one begin + println(i, ": one task at a time") + sleep(0.5) end println(i, ": after") end ``` """ -macro section(args...) - error("The @section macro may only be used inside of a @tasks block.") +macro one_by_one(args...) + error("The @one_by_one macro may only be used inside of a @tasks block.") end diff --git a/src/tools.jl b/src/tools.jl index 5e4079ed..6fe73c32 100644 --- a/src/tools.jl +++ b/src/tools.jl @@ -25,33 +25,35 @@ Return a `UInt` identifier for the current running [Task](https://docs.julialang taskid() = objectid(current_task()) """ -May be used to implement a "single" section in parallel code. This section will only be -run by a single task (other tasks will skip over it). +May be used to mark a region in parallel code to be executed by a single task only +(all other tasks shall skip over it). -See [`try_enter`](@ref). +See [`try_enter!`](@ref) and [`reset!`](@ref). """ -struct SectionSingle - first::Base.RefValue{Bool} - lck::ReentrantLock - SectionSingle() = new(Ref(true), ReentrantLock()) +mutable struct OneOnlyRegion + @atomic latch::Bool + OneOnlyRegion() = new(false) end """ - try_enter(f, s::SectionSingle) + try_enter!(f, s::OneOnlyRegion) -When called from multiple parallel tasks (on a shared `s::SectionSingle`) only a single -task will execute `f`. Typical usage: +When called from multiple parallel tasks (on a shared `s::OneOnlyRegion`) only a single +task will execute `f`. + +## Example ```julia -using OhMyThreads.Tools: SectionSingle +using OhMyThreads: @tasks +using OhMyThreads.Tools: OneOnlyRegion, try_enter! -s = SectionSingle() +one_only = OneOnlyRegion() @tasks for i in 1:10 @set ntasks = 10 println(i, ": before") - try_enter(s) do + try_enter!(one_only) do println(i, ": only printed by a single task") sleep(1) end @@ -59,15 +61,25 @@ s = SectionSingle() end ``` """ -function try_enter(f, s::SectionSingle) - run_f = false - lock(s.lck) do - if s.first[] - run_f = true # The first task to try_enter → run f - s.first[] = false - end +function try_enter!(f, s::OneOnlyRegion) + latch = @atomic :monotonic s.latch + if latch + return + end + (_, success) = @atomicreplace s.latch false=>true + if !success + return end - run_f && f() + f() + return +end + +""" +Reset the `OneOnlyRegion` (so that it can be used again). +""" +function reset!(s::OneOnlyRegion) + @atomicreplace s.latch true=>false + nothing end end # Tools diff --git a/test/runtests.jl b/test/runtests.jl index 3e867b6b..fa6bed26 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -374,7 +374,7 @@ end; end end; -# for testing @section :critical +# for testing @one_by_one region mutable struct SingleAccessOnly in_use::Bool const lck::ReentrantLock @@ -395,13 +395,13 @@ function acquire(f, o::SingleAccessOnly) end end -@testset "sections" begin - @testset ":critical" begin +@testset "regions" begin + @testset "@one_by_one" begin sao = SingleAccessOnly() try @tasks for i in 1:10 @set ntasks = 10 - @section :critical begin + @one_by_one begin acquire(sao) do sleep(0.01) end @@ -422,8 +422,8 @@ end @set ntasks = 10 y += 1 # not safe (race condition) - @section :critical begin - x += 1 # parallel-safe because inside of critical section + @one_by_one begin + x += 1 # parallel-safe because inside of one_by_one region acquire(sao) do sleep(0.01) end @@ -435,7 +435,7 @@ end end end - @testset ":single" begin + @testset "@one_only" begin x = 0 y = 0 try @@ -443,8 +443,8 @@ end @set ntasks = 10 y += 1 # not safe (race condition) - @section :single begin - x += 1 # parallel-safe because inside of single section + @one_only begin + x += 1 # parallel-safe because only a single task will execute this end end @test x == 1 # only a single task should have incremented x @@ -452,6 +452,27 @@ end @test false end end + + @testset "@one_only + @one_by_one" begin + x = 0 + y = 0 + try + @tasks for i in 1:10 + @set ntasks = 10 + + @one_only begin + x += 1 # parallel-safe + end + + @one_by_one begin + y += 1 # parallel-safe + end + end + @test x == 1 && y == 10 + catch ErrorException + @test false + end + end end; # Todo way more testing, and easier tests to deal with From 74d464f87eefdbae5e02a7213e3f4f71bd58269b Mon Sep 17 00:00:00 2001 From: Carsten Bauer Date: Tue, 19 Mar 2024 15:09:24 +0100 Subject: [PATCH 7/7] one_only -> only_one --- CHANGELOG.md | 2 +- docs/src/refs/api.md | 2 +- src/OhMyThreads.jl | 2 +- src/macro_impl.jl | 30 +++++++++++++++--------------- src/macros.jl | 8 ++++---- src/tools.jl | 20 ++++++++++---------- test/runtests.jl | 8 ++++---- 7 files changed, 36 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d017c450..c11095f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ OhMyThreads.jl Changelog Version 0.5.1 ------------- - ![Feature][badge-feature] Within a parallel `@tasks` block one can now mark a region with `@one_by_one`. This region will be run by one task at a time ("critical region"). -- ![Feature][badge-feature] Within a `@tasks` block one can now mark a region as with `@one_only`. This region will be run by a single parallel task only (other tasks will skip over it). +- ![Feature][badge-feature] Within a `@tasks` block one can now mark a region as with `@only_one`. This region will be run by a single parallel task only (other tasks will skip over it). Version 0.5.0 ------------- diff --git a/docs/src/refs/api.md b/docs/src/refs/api.md index 240a7dfb..3f9134db 100644 --- a/docs/src/refs/api.md +++ b/docs/src/refs/api.md @@ -11,7 +11,7 @@ CollapsedDocStrings = true @tasks @set @local -@one_only +@only_one @one_by_one ``` diff --git a/src/OhMyThreads.jl b/src/OhMyThreads.jl index dd2d2cba..c4592a84 100644 --- a/src/OhMyThreads.jl +++ b/src/OhMyThreads.jl @@ -20,7 +20,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler SerialScheduler include("implementation.jl") -export @tasks, @set, @local, @one_by_one, @one_only +export @tasks, @set, @local, @one_by_one, @only_one export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler diff --git a/src/macro_impl.jl b/src/macro_impl.jl index 129c736e..0ae1ecec 100644 --- a/src/macro_impl.jl +++ b/src/macro_impl.jl @@ -1,4 +1,4 @@ -using OhMyThreads.Tools: OneOnlyRegion, try_enter! +using OhMyThreads.Tools: OnlyOneRegion, try_enter! function tasks_macro(forex) if forex.head != :for @@ -26,7 +26,7 @@ function tasks_macro(forex) !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")) && !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@local")) && !(arg isa Expr && arg.head == :macrocall && - arg.args[1] == Symbol("@one_only")) && + arg.args[1] == Symbol("@only_one")) && !(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@one_by_one")) end @@ -36,7 +36,7 @@ function tasks_macro(forex) locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args) tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before) _maybe_handle_atset_block!(settings, forbody.args) - setup_oneonly_blocks = _maybe_handle_atoneonly_blocks!(forbody.args) + setup_onlyone_blocks = _maybe_handle_atonlyone_blocks!(forbody.args) setup_onebyone_blocks = _maybe_handle_atonebyone_blocks!(forbody.args) itrng = esc(itrng) @@ -56,7 +56,7 @@ function tasks_macro(forex) end q = if isgiven(settings.reducer) quote - $setup_oneonly_blocks + $setup_onlyone_blocks $setup_onebyone_blocks $make_mapping_function tmapreduce(mapping_function, $(settings.reducer), @@ -65,7 +65,7 @@ function tasks_macro(forex) elseif isgiven(settings.collect) maybe_warn_useless_init(settings) quote - $setup_oneonly_blocks + $setup_onlyone_blocks $setup_onebyone_blocks $make_mapping_function tmap(mapping_function, $(itrng)) @@ -73,7 +73,7 @@ function tasks_macro(forex) else maybe_warn_useless_init(settings) quote - $setup_oneonly_blocks + $setup_onlyone_blocks $setup_onebyone_blocks $make_mapping_function tforeach(mapping_function, $(itrng)) @@ -224,24 +224,24 @@ function _handle_atset_single_assign!(settings, ex) end end -function _maybe_handle_atoneonly_blocks!(args) +function _maybe_handle_atonlyone_blocks!(args) idcs = findall(args) do arg - arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@one_only") + arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@only_one") end - isnothing(idcs) && return # no @one_only blocks - setup_oneonly_blocks = quote end + isnothing(idcs) && return # no @only_one blocks + setup_onlyone_blocks = quote end for i in idcs body = args[i].args[3] - @gensym oneonly - init_oneonly_ex = :($(oneonly) = $(OneOnlyRegion())) - push!(setup_oneonly_blocks.args, init_oneonly_ex) + @gensym onlyone + init_onlyone_ex = :($(onlyone) = $(OnlyOneRegion())) + push!(setup_onlyone_blocks.args, init_onlyone_ex) args[i] = quote - Tools.try_enter!($(oneonly)) do + Tools.try_enter!($(onlyone)) do $(esc(body)) end end end - return setup_oneonly_blocks + return setup_onlyone_blocks end function _maybe_handle_atonebyone_blocks!(args) diff --git a/src/macros.jl b/src/macros.jl index 66da213e..1a51ade9 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -153,7 +153,7 @@ end end """ - @one_only begin ... end + @only_one begin ... end This can be used inside a `@tasks for ... end` block to mark a region of code to be executed by only one of the parallel tasks (all other tasks skip over this region). @@ -167,7 +167,7 @@ using OhMyThreads: @tasks @set ntasks = 10 println(i, ": before") - @one_only begin + @only_one begin println(i, ": only printed by a single task") sleep(1) end @@ -175,8 +175,8 @@ using OhMyThreads: @tasks end ``` """ -macro one_only(args...) - error("The @one_only macro may only be used inside of a @tasks block.") +macro only_one(args...) + error("The @only_one macro may only be used inside of a @tasks block.") end """ diff --git a/src/tools.jl b/src/tools.jl index 6fe73c32..d5663b1e 100644 --- a/src/tools.jl +++ b/src/tools.jl @@ -30,30 +30,30 @@ May be used to mark a region in parallel code to be executed by a single task on See [`try_enter!`](@ref) and [`reset!`](@ref). """ -mutable struct OneOnlyRegion +mutable struct OnlyOneRegion @atomic latch::Bool - OneOnlyRegion() = new(false) + OnlyOneRegion() = new(false) end """ - try_enter!(f, s::OneOnlyRegion) + try_enter!(f, s::OnlyOneRegion) -When called from multiple parallel tasks (on a shared `s::OneOnlyRegion`) only a single +When called from multiple parallel tasks (on a shared `s::OnlyOneRegion`) only a single task will execute `f`. ## Example ```julia using OhMyThreads: @tasks -using OhMyThreads.Tools: OneOnlyRegion, try_enter! +using OhMyThreads.Tools: OnlyOneRegion, try_enter! -one_only = OneOnlyRegion() +only_one = OnlyOneRegion() @tasks for i in 1:10 @set ntasks = 10 println(i, ": before") - try_enter!(one_only) do + try_enter!(only_one) do println(i, ": only printed by a single task") sleep(1) end @@ -61,7 +61,7 @@ one_only = OneOnlyRegion() end ``` """ -function try_enter!(f, s::OneOnlyRegion) +function try_enter!(f, s::OnlyOneRegion) latch = @atomic :monotonic s.latch if latch return @@ -75,9 +75,9 @@ function try_enter!(f, s::OneOnlyRegion) end """ -Reset the `OneOnlyRegion` (so that it can be used again). +Reset the `OnlyOneRegion` (so that it can be used again). """ -function reset!(s::OneOnlyRegion) +function reset!(s::OnlyOneRegion) @atomicreplace s.latch true=>false nothing end diff --git a/test/runtests.jl b/test/runtests.jl index fa6bed26..9530c138 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -435,7 +435,7 @@ end end end - @testset "@one_only" begin + @testset "@only_one" begin x = 0 y = 0 try @@ -443,7 +443,7 @@ end @set ntasks = 10 y += 1 # not safe (race condition) - @one_only begin + @only_one begin x += 1 # parallel-safe because only a single task will execute this end end @@ -453,14 +453,14 @@ end end end - @testset "@one_only + @one_by_one" begin + @testset "@only_one + @one_by_one" begin x = 0 y = 0 try @tasks for i in 1:10 @set ntasks = 10 - @one_only begin + @only_one begin x += 1 # parallel-safe end