Skip to content

Commit

Permalink
Simplify thread-safe configuration (#147)
Browse files Browse the repository at this point in the history
* Simplify thread-safe configuration

- use Atomix.jl  @atomic to atomically add to (single-element) arrays
  remove previous special-case AtomicScalar type
- a thread-safe model configuration is now defined by setting the top-level parameter 'threadsafe: true'
  in the YAML configuration file.  Reactions that need to handle thread-safety explicitly should read this parameter when creating methods.
- deprecate 'threadsafe' parameter from 'create_modeldata(...)', with deprecation check and error if set.

* Update Project.toml [compat] for Atomicx

* doc fix
  • Loading branch information
sjdaines authored Dec 16, 2024
1 parent bc82c7a commit ec7477e
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 126 deletions.
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ authors = ["Stuart Daines <[email protected]>"]
version = "0.21.36"

[deps]
Atomix = "a9b6321e-bd34-4604-b9c9-b65b8de01458"
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DocStringExtensions = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae"
Expand All @@ -30,6 +31,7 @@ TimerOutputs = "a759f4b9-e2f1-59dc-863e-4aeb61b1ea8f"
YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6"

[compat]
Atomix = "0.1, 1.0"
BenchmarkTools = "1.0"
DataFrames = "1.1"
DocStringExtensions = "0.8, 0.9"
Expand Down
2 changes: 1 addition & 1 deletion docs/src/CreateInitializeLoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ The custom Vector of [`CellRange`](@ref)s should then be supplied to

### Multithreaded models
To use with a multithreaded solver eg for a spatially tiled model:
- Set `threadsafe=true` when calling [`create_modeldata`](@ref). The subsequent call to [`allocate_variables!`](@ref) will then allocate Julia `Threads.Atomic` variables for PALEO Variables with attribute `atomic: = true` (usually scalar accumulator variables for totals etc).
- Set the global parameter `threadsafe=true` in the YAML file before calling [`create_model_from_config`](@ref). This is used by Reactions that require special handling eg atomic operations to maintain thread safety (usually those with scalar accumulator variables for totals etc).
- Supply a `method_barrier` implementing a thread barrier function to [`initialize_reactiondata!`](@ref). Reactions are sorted into groups, where each group has no dependencies and later groups depend on earlier ones. The `method_barrier` will be inserted between these groups of independent Reactions.

### Automatic differentiation
Expand Down
7 changes: 7 additions & 0 deletions docs/src/Reaction API.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ zero_ad
value_ad
```

#### Writing thread-safe code

Reactions that accumulate per-cell quantities into a scalar variable (eg to calculate Domain totals) and are intended for use in
large models that use a multithreaded solver with tiled Domains should use `atomic_add!`:
```@docs
atomic_add!
```

#### Optimising loops over cells using explicit SIMD instructions
Reactions with simple loops over `cellindices` that implement time-consuming per-cell calculations
Expand Down
17 changes: 10 additions & 7 deletions src/Fields.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

import Infiltrator

############################################################
# (Function) Spaces
###########################################################
Expand Down Expand Up @@ -109,14 +109,17 @@ If the subtype needs to provide values for a numerical solver (eg as a state var
If the subtype has a representation as components, it should implement:
[`num_components`](@ref), [`get_components`](@ref)
If the subtype needs to provide a thread-safe atomic addition operation eg to provide scalar accumulator variables for Domain totals with a tiled model,
it should implement [`atomic_add!`](@ref) for the field `data_type`.
"""
AbstractData


"""
allocate_values(
field_data::Type{<:AbstractData}, data_dims::Tuple{Vararg{NamedDimension}}, data_type, space::Type{<:AbstractSpace}, spatial_size::Tuple;
thread_safe::Bool, allocatenans::Bool,
allocatenans::Bool,
) -> values
allocate `Field.values` (eg an Array) for `field_data` with dimensions defined by `spatial_size` and `data_dims`
Expand All @@ -127,7 +130,7 @@ function allocate_values(
data_type,
space::Type{<:AbstractSpace},
spatial_size::Tuple{Integer, Vararg{Integer}}, # an NTuple with at least one element
thread_safe::Bool, allocatenans::Bool,
allocatenans::Bool,
)
end

Expand Down Expand Up @@ -315,7 +318,7 @@ Convert Field `values` to a Vector of components
function get_components(values, field_data::Type{<:AbstractData}) end

"Optional: sanitize `values` for storing as model output.
Default implementation is usually OK - only implement eg for Atomic types that should be converted to standard types for storage"
Default implementation is usually OK - only implement for custom types that should be converted to standard types for storage"
get_values_output(values, data_type::Type{<:AbstractData}, data_dims::Tuple{Vararg{NamedDimension}}, space, mesh) = values


Expand Down Expand Up @@ -371,11 +374,11 @@ function add_field! end
"create a new Field, allocating `values` data arrays"
function allocate_field(
field_data::Type, data_dims::NTuple{N, NamedDimension}, data_type::Type, space::Type{<:AbstractSpace}, mesh;
thread_safe::Bool, allocatenans
allocatenans
) where {N}
v = allocate_values(
field_data, data_dims, data_type, space, spatial_size(space, mesh),
thread_safe=thread_safe, allocatenans=allocatenans,
field_data, data_dims, data_type, space, spatial_size(space, mesh);
allocatenans,
)

return Field{field_data, space, typeof(v), N, typeof(mesh)}(v, data_dims, mesh)
Expand Down
10 changes: 6 additions & 4 deletions src/Model.jl
Original file line number Diff line number Diff line change
Expand Up @@ -393,16 +393,18 @@ end


"""
create_modeldata(model::Model [, eltype] [; threadsafe]) -> modeldata::ModelData
create_modeldata(model::Model [; arrays_eltype=Float64]) -> modeldata::ModelData
Create a new [`ModelData`](@ref) struct for model variables of element type `eltype`.
Create a new [`ModelData`](@ref) struct for model variables with data arrays element type `arrays_eltype`.
"""
function create_modeldata(
model::Model, arrays_eltype::DataType=Float64;
threadsafe=false,
allocatenans=true, # fill Arrays with NaN when first allocated
threadsafe=false, # deprecated
)
modeldata = ModelData(model; arrays_eltype, threadsafe, allocatenans)
threadsafe == false || error("create_modeldata: 'threadsafe=true' no longer supported. Set parameter 'threadsafe: true' in YAML configuration file instead.")

modeldata = ModelData(model; arrays_eltype, allocatenans)

modeldata.cellranges_all = create_default_cellrange(model)

Expand Down
11 changes: 6 additions & 5 deletions src/ModelData.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ end


"""
ModelData(model::Model; arrays_eltype::DataType=Float64, threadsafe::Bool=false, allocatenans::Bool=true)
ModelData(model::Model; arrays_eltype::DataType=Float64, allocatenans::Bool=true)
Create a ModelData struct containing `model` data arrays.
Expand All @@ -37,7 +37,6 @@ which requires Dual numbers as the array element type.
Base.@kwdef mutable struct ModelData <: AbstractModelData
model::Model
domain_data::Vector{Tuple{DataType, String, Vector{DomainData}}} = Tuple{DataType, String, Vector{DomainData}}[]
threadsafe::Bool
allocatenans::Bool # fill with NaN when allocating

sorted_methodsdata_setup = nothing # only for arrays_idx = 1
Expand All @@ -49,11 +48,13 @@ Base.@kwdef mutable struct ModelData <: AbstractModelData
solver_view_all = nothing # only for arrays_idx = 1
end

function ModelData(model::Model; arrays_eltype::DataType=Float64, threadsafe::Bool=false, allocatenans::Bool=true)

function ModelData(
model::Model;
arrays_eltype::DataType=Float64,
allocatenans::Bool=true,
)
modeldata = ModelData(;
model,
threadsafe,
allocatenans,
)

Expand Down
2 changes: 1 addition & 1 deletion src/PALEOboxes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using DocStringExtensions
import OrderedCollections
import Logging
import Printf
import Atomix

import PrecompileTools
import TimerOutputs: @timeit, @timeit_debug
Expand All @@ -37,7 +38,6 @@ include("Types.jl")
include("CoordsDims.jl")
include("Fields.jl")

include("data/AtomicScalar.jl")
include("data/ScalarData.jl")
include("data/ArrayScalarData.jl")
include("data/IsotopeData.jl")
Expand Down
14 changes: 4 additions & 10 deletions src/VariableDomain.jl
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ get_data(var::VariableDomain, modeldata::AbstractModelData, arrays_idx::Int=1) =
get_data(var::VariableDomain, domaindata::AbstractDomainData) = get_field(var, domaindata).values

"""
get_data(var::VariableDomain, modeldata::AbstractModelData, arrays_idx::Int) -> get_values_output(field.values)
get_data(var::VariableDomain, domaindata::AbstractDomainData) -> get_values_output(field.values)
get_data_output(var::VariableDomain, modeldata::AbstractModelData, arrays_idx::Int) -> get_values_output(field.values)
get_data_output(var::VariableDomain, domaindata::AbstractDomainData) -> get_values_output(field.values)
Get a sanitized version of Variable `var` data array for storing as output
from [`get_values_output`]@ref)`(`[`Field`](@ref).values`)`
Expand Down Expand Up @@ -265,12 +265,6 @@ function allocate_variables!(
mdeltype = get(eltypemap, mdeltype_str, Float64)
@debug "Variable $(fullname(v)) mdeltype $mdeltype_str -> $mdeltype"
end

thread_safe = false
if get_attribute(v, :atomic, false) && modeldata.threadsafe
@info " $(fullname(v)) allocating Atomic data"
thread_safe = true
end

field_data = get_attribute(v, :field_data)
space = get_attribute(v, :space)
Expand All @@ -297,8 +291,8 @@ function allocate_variables!(
else
# allocate new field array
v_field = allocate_field(
field_data, data_dims, mdeltype, space, v.domain.grid,
thread_safe=thread_safe, allocatenans=modeldata.allocatenans
field_data, data_dims, mdeltype, space, v.domain.grid;
allocatenans=modeldata.allocatenans
)
end

Expand Down
4 changes: 1 addition & 3 deletions src/data/ArrayScalarData.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ end

function allocate_values(
field_data::Type{ArrayScalarData}, data_dims::Tuple{NamedDimension, Vararg{NamedDimension}}, data_type, space::Type{<:AbstractSpace}, spatial_size::Tuple{Integer, Vararg{Integer}};
thread_safe, allocatenans,
allocatenans,
)
!thread_safe || throw(ArgumentError("ArrayScalarData with thread_safe==true"))

s = _size_arrayscalardata(data_dims, space, spatial_size)
d = Array{data_type, length(s)}(undef, s...)

Expand Down
42 changes: 0 additions & 42 deletions src/data/AtomicScalar.jl

This file was deleted.

24 changes: 7 additions & 17 deletions src/data/IsotopeData.jl
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ end

function allocate_values(
data::Type{IsotopeLinear}, data_dims::Tuple{}, data_type, space::Type{<:AbstractSpace}, spatial_size::Tuple{Integer, Vararg{Integer}};
thread_safe, allocatenans,
allocatenans,
)
v = allocate_values(ScalarData, data_dims, data_type, space, spatial_size, thread_safe=thread_safe, allocatenans=allocatenans)
v_moldelta = allocate_values(ScalarData, data_dims, data_type, space, spatial_size, thread_safe=thread_safe, allocatenans=allocatenans)
v = allocate_values(ScalarData, data_dims, data_type, space, spatial_size; allocatenans)
v_moldelta = allocate_values(ScalarData, data_dims, data_type, space, spatial_size; allocatenans)

return StructArrays.StructArray{IsotopeLinear{data_type, data_type}}((v, v_moldelta))
end
Expand Down Expand Up @@ -403,18 +403,8 @@ function create_accessor(
end

#############################################################
# Special cases for atomic scalar Vectors of IsotopeLinear
# Special cases for atomic addition
##########################################################
"type of an atomic scalar Vector of IsotopeLinear"
const AtomicIsotopeLinear = StructArrays.StructVector{
IsotopeLinear{Float64, Float64},
NamedTuple{(:v, :v_moldelta), Tuple{AtomicScalar{Float64}, AtomicScalar{Float64}}},
Int64
}

atomic_add!(ias::AtomicIsotopeLinear, iv::IsotopeLinear) = (atomic_add!(ias.v, iv.v); atomic_add!(ias.v_moldelta, iv.v_moldelta))

function get_values_output(values::AtomicIsotopeLinear, data_type::Type{IsotopeLinear}, data_dims::Tuple{}, space::Type{<:AbstractSpace}, mesh)
# convert to an IsotopeLinear for output
return StructArrays.StructArray( [values[]])
end

atomic_add!(ias::StructArrays.StructVector{<:IsotopeLinear}, iv::IsotopeLinear) = (atomic_add!(ias.v, iv.v); atomic_add!(ias.v_moldelta, iv.v_moldelta);)

24 changes: 12 additions & 12 deletions src/data/ScalarData.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@ get_components(values, field_data::Type{ScalarData}) = [values]

function allocate_values(
field_data::Type{ScalarData}, data_dims::Tuple{}, data_type, space::Type{<:AbstractSpace}, spatial_size::Tuple{Integer, Vararg{Integer}};
thread_safe, allocatenans,
allocatenans,
)

if thread_safe
spatial_size == (1, ) ||
throw(ArgumentError("thread_safe with spatial_size != (1, )"))
d = AtomicScalar{data_type}()
else
d = Array{data_type, length(spatial_size)}(undef, spatial_size...)
end
d = Array{data_type, length(spatial_size)}(undef, spatial_size...)

if allocatenans
fill!(d, NaN)
Expand All @@ -36,10 +30,6 @@ function allocate_values(
return d
end

function get_values_output(values::AtomicScalar, data_type::Type{ScalarData}, data_dims::Tuple{}, space, mesh)
return [values[]]
end

function check_values(
existing_values, field_data::Type{ScalarData}, data_dims::Tuple{}, data_type, space::Type{<:AbstractSpace}, spatial_size::Tuple{Integer, Vararg{Integer}}
)
Expand Down Expand Up @@ -83,6 +73,16 @@ function init_values!(
return nothing
end

"""
atomic_add!(values::AbstractArray, v)
Add `v` to `values[]` using Thread-safe atomic operation.
This is a generic fallback for ScalarData containing `values` of any `data_type`.
"""
atomic_add!(values::AbstractArray, v) = Atomix.@atomic values[] += v


function zero_values!(values, field_data::Type{ScalarData}, data_dims::Tuple{}, space::Type{ScalarSpace}, cellrange)
values[] = 0.0
return nothing
Expand Down
13 changes: 10 additions & 3 deletions src/reactioncatalog/Reservoirs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -628,16 +628,18 @@ function PB.register_methods!(rj::ReactionReservoirWellMixed)
end
PB.setfrozen!(rj.pars.initialization_type)

threadsafe = get(rj.external_parameters, "threadsafe", false)

PB.add_method_do!(rj, do_reservoir_well_mixed, (PB.VarList_namedtuple(vars),))

# add method to sum per-cell vec_sms to scalar sms
vars_sms = [
PB.VarDerivScalar( "R_sms", "mol yr-1", "scalar reservoir source-sinks",
attributes=(:field_data=>rj.pars.field_data[], :atomic=>true,)),
attributes=(:field_data=>rj.pars.field_data[],)),
PB.VarTarget( "R_vec_sms","mol yr-1", "vector reservoir source-sinks",
attributes=(:field_data=>rj.pars.field_data[],))
]
PB.add_method_do!(rj, do_reservoir_well_mixed_sms, (PB.VarList_namedtuple(vars_sms),))
PB.add_method_do!(rj, do_reservoir_well_mixed_sms, (PB.VarList_namedtuple(vars_sms),); p=threadsafe)

PB.add_method_initialize_zero_vars_default!(rj)

Expand Down Expand Up @@ -678,14 +680,19 @@ function do_reservoir_well_mixed_sms(
cellrange::PB.AbstractCellRange,
deltat,
)
threadsafe = m.p

# accumulate first into a subtotal,
# in order to minimise time spent with lock held if this is one tile of a threaded model
subtotal = zero(vars.R_sms[])
@inbounds for i in cellrange.indices
subtotal += vars.R_vec_sms[i]
end
PB.atomic_add!(vars.R_sms, subtotal)
if threadsafe
PB.atomic_add!(vars.R_sms, subtotal)
else
vars.R_sms[] += subtotal
end

return nothing
end
Expand Down
Loading

0 comments on commit ec7477e

Please sign in to comment.