Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add special region support to @tasks #93

Merged
merged 7 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
OhMyThreads.jl Changelog
=========================

Version 0.5.1
-------------
- ![Feature][badge-feature] Within a parallel `@tasks` block one can now mark a region with `@one_by_one`. This region will be run by one task at a time ("critical region").
- ![Feature][badge-feature] Within a `@tasks` block one can now mark a region as with `@only_one`. This region will be run by a single parallel task only (other tasks will skip over it).

Version 0.5.0
-------------

Expand Down
2 changes: 2 additions & 0 deletions docs/src/refs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ CollapsedDocStrings = true
@tasks
@set
@local
@only_one
@one_by_one
```

### Functions
Expand Down
2 changes: 1 addition & 1 deletion src/OhMyThreads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using .Schedulers: Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler
SerialScheduler
include("implementation.jl")

export @tasks, @set, @local
export @tasks, @set, @local, @one_by_one, @only_one
export treduce, tmapreduce, treducemap, tmap, tmap!, tforeach, tcollect
export Scheduler, DynamicScheduler, StaticScheduler, GreedyScheduler, SerialScheduler

Expand Down
74 changes: 68 additions & 6 deletions src/macro_impl.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using OhMyThreads.Tools: OnlyOneRegion, try_enter!

function tasks_macro(forex)
if forex.head != :for
throw(ErrorException("Expected a for loop after `@tasks`."))
Expand All @@ -17,11 +19,26 @@ function tasks_macro(forex)

settings = Settings()

# Escape everything in the loop body that is not used in conjuction with one of our
# "macros", e.g. @set or @local. Code inside of these macro blocks will be escaped by
# the respective "macro" handling functions below.
for i in findall(forbody.args) do arg
!(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")) &&
!(arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@local")) &&
!(arg isa Expr && arg.head == :macrocall &&
arg.args[1] == Symbol("@only_one")) &&
!(arg isa Expr && arg.head == :macrocall &&
arg.args[1] == Symbol("@one_by_one"))
end
forbody.args[i] = esc(forbody.args[i])
end

locals_before, locals_names = _maybe_handle_atlocal_block!(forbody.args)
tls_names = isnothing(locals_before) ? [] : map(x -> x.args[1], locals_before)
_maybe_handle_atset_block!(settings, forbody.args)
setup_onlyone_blocks = _maybe_handle_atonlyone_blocks!(forbody.args)
setup_onebyone_blocks = _maybe_handle_atonebyone_blocks!(forbody.args)

forbody = esc(forbody)
itrng = esc(itrng)
itvar = esc(itvar)

Expand All @@ -39,19 +56,25 @@ function tasks_macro(forex)
end
q = if isgiven(settings.reducer)
quote
$setup_onlyone_blocks
$setup_onebyone_blocks
$make_mapping_function
tmapreduce(mapping_function, $(settings.reducer),
$(itrng))
end
elseif isgiven(settings.collect)
maybe_warn_useless_init(settings)
quote
$setup_onlyone_blocks
$setup_onebyone_blocks
$make_mapping_function
tmap(mapping_function, $(itrng))
end
else
maybe_warn_useless_init(settings)
quote
$setup_onlyone_blocks
$setup_onebyone_blocks
$make_mapping_function
tforeach(mapping_function, $(itrng))
end
Expand All @@ -68,7 +91,7 @@ function tasks_macro(forex)
for (k, v) in settings.kwargs
push!(kwexpr.args, Expr(:kw, k, v))
end
insert!(q.args[4].args, 2, kwexpr)
insert!(q.args[8].args, 2, kwexpr)

# wrap everything in a let ... end block
# and, potentially, define the `TaskLocalValue`s.
Expand Down Expand Up @@ -151,16 +174,15 @@ function _atlocal_assign_to_exprs(ex)
tls_type = esc(left_ex.args[2])
local_before = :($(tl_storage) = TaskLocalValue{$tls_type}(() -> $(tls_def)))
else
tls_sym = esc(left_ex)
tls_sym = esc(left_ex)
local_before = :($(tl_storage) = let f = () -> $(tls_def)
TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f)
end)
TaskLocalValue{Core.Compiler.return_type(f, Tuple{})}(f)
end)
end
local_name = :($(tls_sym))
return local_before, local_name
end


function _maybe_handle_atset_block!(settings, args)
idcs = findall(args) do arg
arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@set")
Expand Down Expand Up @@ -201,3 +223,43 @@ function _handle_atset_single_assign!(settings, ex)
push!(settings.kwargs, sym => esc(def))
end
end

function _maybe_handle_atonlyone_blocks!(args)
idcs = findall(args) do arg
arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@only_one")
end
isnothing(idcs) && return # no @only_one blocks
setup_onlyone_blocks = quote end
for i in idcs
body = args[i].args[3]
@gensym onlyone
init_onlyone_ex = :($(onlyone) = $(OnlyOneRegion()))
push!(setup_onlyone_blocks.args, init_onlyone_ex)
args[i] = quote
Tools.try_enter!($(onlyone)) do
$(esc(body))
end
end
end
return setup_onlyone_blocks
end

function _maybe_handle_atonebyone_blocks!(args)
idcs = findall(args) do arg
arg isa Expr && arg.head == :macrocall && arg.args[1] == Symbol("@one_by_one")
end
isnothing(idcs) && return # no @one_by_one blocks
setup_onebyone_blocks = quote end
for i in idcs
body = args[i].args[3]
@gensym onebyone
init_lock_ex = :($(onebyone) = $(Base.ReentrantLock()))
push!(setup_onebyone_blocks.args, init_lock_ex)
args[i] = quote
$(esc(:lock))($(onebyone)) do
$(esc(body))
end
end
end
return setup_onebyone_blocks
end
55 changes: 55 additions & 0 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,58 @@ end
error("The @local macro may only be used inside of a @tasks block.")
end
end

"""
@only_one begin ... end

This can be used inside a `@tasks for ... end` block to mark a region of code to be
executed by only one of the parallel tasks (all other tasks skip over this region).

## Example

```julia
using OhMyThreads: @tasks

@tasks for i in 1:10
@set ntasks = 10

println(i, ": before")
@only_one begin
println(i, ": only printed by a single task")
sleep(1)
end
println(i, ": after")
end
```
"""
macro only_one(args...)
error("The @only_one macro may only be used inside of a @tasks block.")
end

"""
@one_by_one begin ... end

This can be used inside a `@tasks for ... end` block to mark a region of code to be
executed by one parallel task at a time (i.e. exclusive access). The order may be arbitrary
and non-deterministic.

## Example

```julia
using OhMyThreads: @tasks

@tasks for i in 1:10
@set ntasks = 10

println(i, ": before")
@one_by_one begin
println(i, ": one task at a time")
sleep(0.5)
end
println(i, ": after")
end
```
"""
macro one_by_one(args...)
error("The @one_by_one macro may only be used inside of a @tasks block.")
end
58 changes: 58 additions & 0 deletions src/tools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,62 @@ Return a `UInt` identifier for the current running [Task](https://docs.julialang
"""
taskid() = objectid(current_task())

"""
May be used to mark a region in parallel code to be executed by a single task only
(all other tasks shall skip over it).

See [`try_enter!`](@ref) and [`reset!`](@ref).
"""
mutable struct OnlyOneRegion
@atomic latch::Bool
OnlyOneRegion() = new(false)
end

"""
try_enter!(f, s::OnlyOneRegion)

When called from multiple parallel tasks (on a shared `s::OnlyOneRegion`) only a single
task will execute `f`.

## Example

```julia
using OhMyThreads: @tasks
using OhMyThreads.Tools: OnlyOneRegion, try_enter!

only_one = OnlyOneRegion()

@tasks for i in 1:10
@set ntasks = 10

println(i, ": before")
try_enter!(only_one) do
println(i, ": only printed by a single task")
sleep(1)
end
println(i, ": after")
end
```
"""
function try_enter!(f, s::OnlyOneRegion)
latch = @atomic :monotonic s.latch
if latch
return
end
(_, success) = @atomicreplace s.latch false=>true
if !success
return
end
f()
return
end

"""
Reset the `OnlyOneRegion` (so that it can be used again).
"""
function reset!(s::OnlyOneRegion)
@atomicreplace s.latch true=>false
nothing
end

end # Tools
Loading
Loading