From f2511962ca15f3aaf87d2571e3551e59dc05c694 Mon Sep 17 00:00:00 2001 From: "Gregory L. Wagner" Date: Mon, 26 Feb 2024 22:00:26 -0600 Subject: [PATCH] (0.90.8) Rules for `Time` interpolation / extrapolation for FieldTimeSeries and `InterpolatedField` (#3450) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Bump to 0.90.7 * Fix docstring for field time series * this should work? * fixing the show method * Allow OnDisk FieldTimeSeries without times * this should work for the update * time_extrapolation as property * positional argument for `InMemory` * better show * bugfix * adjust margin * bugfix * Change InMemory user interface to InMemory(chunk_size) * Add back lost architecture kwarg plus fix docstring * Not lost... hidden * Clean up time extrapolation * time_indexing -> time_extrapolation * Generalize _interpolate * Add a node definition for AbstractField * small vugfix * correction * now it should go * add a test * couple of more tests * better definitions of indics * more tests for clamp * should be ready * Clearer architecture determination in FieldTimeSeries * Fix FieldTimeSeries docstring * this works provided a Δt * some show method * tests should pass now * now it works * last bugfix * ok * some more explanation * Call data_summary from interior * Cosmetic change * Start cleaning up time_extrapolation * Fix Cycled time extrapolation * Working on Cyclical extrapolation for FieldTimeSeries * More progress * Its a masterpiece that may not wokr * some getindices * delete file * should work * remove time extrapolation * Less than 2 not ge 2 * canoot index with a tuple * typos * Get in memory to wrk on GPU * Iron out bugs that came up during ClimaOcean tests * Cosmetic update * Updates * Mind bending refactor * Remove unused filesd * Fix a few more bugs * Pretty things up * Fix a bug plus cosmetic updates * Hotfix for LatLonGrid y-periodic * Add time_index and clean up time_indices * Fix bug in GPUAdaptedFTS * Temporary fix for mean * Try to fix showing for GPU fieldtime series * Implement version of interpolate that accepts offsetarray * Fix bugs with interpolating_time_indices * Add method to show tim eindexing * Reduced dimensions for AbstractField and Field * Update RiBasedVerticalDiffusivity * Add comment to RiBasedVerticalDiffusivity * Make halo filling code easier to read * Add a helpful comment * Generalize PartlyInMemory to accept custom backends * Fix a sign error in implicit dissipative buoyancy flux * Better comments * Make interior work for FieldTimeSeries with custom backend * bump patch release * Get rid of unnecessary intermediate constructor for FieldTimeSeries * Add written_names utility * Add extra criterion on dissipative buoyancy flux calculation * Fix faulty show for FieldTimeSeries * Add flattened_unique_values for empty tuple * Fix up possible_field_time_series * Try to get mean of FieldTimeSeries working * mean(FieldTimeSeries) works now? * Bump Project to 0.90.8 * Export FieldDataset * Add conditional_length for FieldTimeSeries * Fix tests * fix test * fixed all tests? * bugfix * another small bugfix * Put back the CATKE bug * Update src/OutputReaders/set_field_time_series.jl Co-authored-by: Simone Silvestri --------- Co-authored-by: Simone Silvestri <33547697+simone-silvestri@users.noreply.github.com> Co-authored-by: Navid C. Constantinou Co-authored-by: Simone Silvestri --- Project.toml | 2 +- src/BoundaryConditions/fill_halo_regions.jl | 151 ++-- src/Fields/abstract_field.jl | 3 +- src/Fields/field.jl | 48 +- src/Fields/field_tuples.jl | 2 + src/Fields/interpolate.jl | 25 +- src/Fields/show_fields.jl | 6 +- src/Grids/latitude_longitude_grid.jl | 4 +- src/Models/Models.jl | 29 +- src/Oceananigans.jl | 2 +- src/OutputReaders/OutputReaders.jl | 37 +- .../adapted_field_time_series.jl | 44 -- .../extract_field_time_series.jl | 38 + src/OutputReaders/field_time_series.jl | 663 ++++++++++-------- .../field_time_series_indexing.jl | 267 +++++++ .../field_time_series_reductions.jl | 51 ++ .../gpu_adapted_field_time_series.jl | 53 -- .../memory_allocated_field_time_series.jl | 98 --- .../on_disk_field_time_series.jl | 66 -- src/OutputReaders/set_field_time_series.jl | 91 +++ src/OutputReaders/show_field_time_series.jl | 61 ++ src/OutputReaders/update_field_time_series.jl | 95 --- src/OutputWriters/OutputWriters.jl | 12 +- .../CATKEVerticalDiffusivities.jl | 45 +- .../ri_based_vertical_diffusivity.jl | 42 +- test/test_output_readers.jl | 49 +- 26 files changed, 1216 insertions(+), 768 deletions(-) delete mode 100644 src/OutputReaders/adapted_field_time_series.jl create mode 100644 src/OutputReaders/extract_field_time_series.jl create mode 100644 src/OutputReaders/field_time_series_indexing.jl create mode 100644 src/OutputReaders/field_time_series_reductions.jl delete mode 100644 src/OutputReaders/gpu_adapted_field_time_series.jl delete mode 100644 src/OutputReaders/memory_allocated_field_time_series.jl delete mode 100644 src/OutputReaders/on_disk_field_time_series.jl create mode 100644 src/OutputReaders/set_field_time_series.jl create mode 100644 src/OutputReaders/show_field_time_series.jl delete mode 100644 src/OutputReaders/update_field_time_series.jl diff --git a/Project.toml b/Project.toml index 2d62211cf6..aa53b72cf9 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Oceananigans" uuid = "9e8cae18-63c1-5223-a75c-80ca9d6e9a09" authors = ["Climate Modeling Alliance and contributors"] -version = "0.90.7" +version = "0.90.8" [deps] Adapt = "79e6a3ab-5dfb-504d-930d-738a2a938a0e" diff --git a/src/BoundaryConditions/fill_halo_regions.jl b/src/BoundaryConditions/fill_halo_regions.jl index ed749618a4..3e16d2f260 100644 --- a/src/BoundaryConditions/fill_halo_regions.jl +++ b/src/BoundaryConditions/fill_halo_regions.jl @@ -81,16 +81,16 @@ end # position [1] and the associated boundary conditions in position [2] function permute_boundary_conditions(boundary_conditions) - split_x_boundaries = split_boundary(extract_west_bc(boundary_conditions), extract_east_bc(boundary_conditions)) - split_y_boundaries = split_boundary(extract_south_bc(boundary_conditions), extract_north_bc(boundary_conditions)) + split_x_halo_filling = split_halo_filling(extract_west_bc(boundary_conditions), extract_east_bc(boundary_conditions)) + split_y_halo_filling = split_halo_filling(extract_south_bc(boundary_conditions), extract_north_bc(boundary_conditions)) west_bc = extract_west_bc(boundary_conditions) east_bc = extract_east_bc(boundary_conditions) south_bc = extract_south_bc(boundary_conditions) north_bc = extract_north_bc(boundary_conditions) - if split_x_boundaries - if split_y_boundaries + if split_x_halo_filling + if split_y_halo_filling fill_halos! = [fill_west_halo!, fill_east_halo!, fill_south_halo!, fill_north_halo!, fill_bottom_and_top_halo!] sides = [:west, :east, :south, :north, :bottom_and_top] bcs_array = [west_bc, east_bc, south_bc, north_bc, extract_bottom_bc(boundary_conditions)] @@ -100,7 +100,7 @@ function permute_boundary_conditions(boundary_conditions) bcs_array = [west_bc, east_bc, south_bc, extract_bottom_bc(boundary_conditions)] end else - if split_y_boundaries + if split_y_halo_filling fill_halos! = [fill_west_and_east_halo!, fill_south_halo!, fill_north_halo!, fill_bottom_and_top_halo!] sides = [:west_and_east, :south, :north, :bottom_and_top] bcs_array = [west_bc, south_bc, north_bc, extract_bottom_bc(boundary_conditions)] @@ -122,17 +122,17 @@ end # Split direction in two distinct fill_halo! events in case of a communication boundary condition # (distributed DCBC), paired with a Flux, Value or Gradient boundary condition -split_boundary(bcs1, bcs2) = false -split_boundary(::DCBC, ::DCBC) = false -split_boundary(bcs1, ::DCBC) = true -split_boundary(::DCBC, bcs2) = true +split_halo_filling(bcs1, bcs2) = false +split_halo_filling(::DCBC, ::DCBC) = false +split_halo_filling(bcs1, ::DCBC) = true +split_halo_filling(::DCBC, bcs2) = true # TODO: support heterogeneous distributed-shared communication -# split_boundary(::MCBC, ::DCBC) = false -# split_boundary(::DCBC, ::MCBC) = false -# split_boundary(::MCBC, ::MCBC) = false -# split_boundary(bcs1, ::MCBC) = true -# split_boundary(::MCBC, bcs2) = true +# split_halo_filling(::MCBC, ::DCBC) = false +# split_halo_filling(::DCBC, ::MCBC) = false +# split_halo_filling(::MCBC, ::MCBC) = false +# split_halo_filling(bcs1, ::MCBC) = true +# split_halo_filling(::MCBC, bcs2) = true ##### ##### Halo filling order @@ -293,30 +293,47 @@ end ##### fill_west_halo!(c, bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_only_west_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + launch!(arch, grid, KernelParameters(size, offset), + _fill_only_west_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + fill_east_halo!(c, bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_only_east_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + launch!(arch, grid, KernelParameters(size, offset), + _fill_only_east_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + fill_south_halo!(c, bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_only_south_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + launch!(arch, grid, KernelParameters(size, offset), + _fill_only_south_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + fill_north_halo!(c, bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_only_north_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + launch!(arch, grid, KernelParameters(size, offset), + _fill_only_north_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + fill_bottom_halo!(c, bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_only_bottom_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + launch!(arch, grid, KernelParameters(size, offset), + _fill_only_bottom_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + fill_top_halo!(c, bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_only_top_halo!, c, bc, loc, grid, Tuple(args); kwargs...) + launch!(arch, grid, KernelParameters(size, offset), + _fill_only_top_halo!, c, bc, loc, grid, Tuple(args); kwargs...) ##### ##### Kernel launchers for double-sided fill_halos ##### -fill_west_and_east_halo!(c, west_bc, east_bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_west_and_east_halo!, c, west_bc, east_bc, loc, grid, Tuple(args); kwargs...) +function fill_west_and_east_halo!(c, west_bc, east_bc, size, offset, loc, arch, grid, args...; kwargs...) + return launch!(arch, grid, KernelParameters(size, offset), + _fill_west_and_east_halo!, c, west_bc, east_bc, loc, grid, Tuple(args); kwargs...) +end -fill_south_and_north_halo!(c, south_bc, north_bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_south_and_north_halo!, c, south_bc, north_bc, loc, grid, Tuple(args); kwargs...) +function fill_south_and_north_halo!(c, south_bc, north_bc, size, offset, loc, arch, grid, args...; kwargs...) + return launch!(arch, grid, KernelParameters(size, offset), + _fill_south_and_north_halo!, c, south_bc, north_bc, loc, grid, Tuple(args); kwargs...) +end -fill_bottom_and_top_halo!(c, bottom_bc, top_bc, size, offset, loc, arch, grid, args...; kwargs...) = - launch!(arch, grid, KernelParameters(size, offset), _fill_bottom_and_top_halo!, c, bottom_bc, top_bc, loc, grid, Tuple(args); kwargs...) +function fill_bottom_and_top_halo!(c, bottom_bc, top_bc, size, offset, loc, arch, grid, args...; kwargs...) + return launch!(arch, grid, KernelParameters(size, offset), + _fill_bottom_and_top_halo!, c, bottom_bc, top_bc, loc, grid, Tuple(args); kwargs...) +end ##### ##### Calculate kernel size and offset for Windowed and Sliced Fields @@ -331,7 +348,9 @@ const TBB = Union{typeof(fill_bottom_and_top_halo!), typeof(fill_bottom_halo!), @inline fill_halo_size(::Tuple, ::SNB, args...) = :xz @inline fill_halo_size(::Tuple, ::TBB, args...) = :xy -# If indices are colon, fill the whole boundary plane! +# If indices are colon, and locations are _not_ Nothing, fill the whole boundary plane! +# If locations are _Nothing_, then the kwarg `reduced_dimensions` will allow the size `:xz` +# to be correctly interpreted inside `launch!`. @inline fill_halo_size(::OffsetArray, ::WEB, ::Tuple{<:Any, <:Colon, <:Colon}, args...) = :yz @inline fill_halo_size(::OffsetArray, ::SNB, ::Tuple{<:Colon, <:Any, <:Colon}, args...) = :xz @inline fill_halo_size(::OffsetArray, ::TBB, ::Tuple{<:Colon, <:Colon, <:Any}, args...) = :xy @@ -343,21 +362,73 @@ const TBB = Union{typeof(fill_bottom_and_top_halo!), typeof(fill_bottom_halo!), @inline whole_halo(::Colon, ::Nothing) = false @inline whole_halo(::Colon, loc) = true -# Calculate kernel size -@inline fill_halo_size(c::OffsetArray, ::WEB, idx, bc, loc, grid) = - @inbounds (ifelse(whole_halo(idx[2], loc[2]), size(grid, 2), size(c, 2)), ifelse(whole_halo(idx[3], loc[3]), size(grid, 3), size(c, 3))) -@inline fill_halo_size(c::OffsetArray, ::SNB, idx, bc, loc, grid) = - @inbounds (ifelse(whole_halo(idx[1], loc[1]), size(grid, 1), size(c, 1)), ifelse(whole_halo(idx[3], loc[3]), size(grid, 3), size(c, 3))) -@inline fill_halo_size(c::OffsetArray, ::TBB, idx, bc, loc, grid) = - @inbounds (ifelse(whole_halo(idx[1], loc[1]), size(grid, 1), size(c, 1)), ifelse(whole_halo(idx[2], loc[2]), size(grid, 2), size(c, 2))) +# Calculate kernel size for windowed fields. This code is only called when +# one or more of the elements of `idx` is not Colon in the two direction perpendicular +# to the halo region and `bc` is not `PeriodicBoundaryCondition`. +@inline function fill_halo_size(c::OffsetArray, ::WEB, idx, bc, loc, grid) + @inbounds begin + whole_y_halo = whole_halo(idx[2], loc[2]) + whole_z_halo = whole_halo(idx[3], loc[3]) + end + + _, Ny, Nz = size(grid) + _, Cy, Cz = size(c) + + Sy = ifelse(whole_y_halo, Ny, Cy) + Sz = ifelse(whole_z_halo, Nz, Cz) + + return (Sy, Sz) +end + +@inline function fill_halo_size(c::OffsetArray, ::SNB, idx, bc, loc, grid) + @inbounds begin + whole_x_halo = whole_halo(idx[1], loc[1]) + whole_z_halo = whole_halo(idx[3], loc[3]) + end + + Nx, _, Nz = size(grid) + Cx, _, Cz = size(c) + + Sx = ifelse(whole_x_halo, Nx, Cx) + Sz = ifelse(whole_z_halo, Nz, Cz) + + return (Sx, Sz) +end + +@inline function fill_halo_size(c::OffsetArray, ::TBB, idx, bc, loc, grid) + @inbounds begin + whole_x_halo = whole_halo(idx[1], loc[1]) + whole_y_halo = whole_halo(idx[2], loc[2]) + end + + Nx, Ny, _ = size(grid) + Cx, Cy, _ = size(c) + + Sx = ifelse(whole_x_halo, Nx, Cx) + Sy = ifelse(whole_y_halo, Ny, Cy) + + return (Sx, Sy) +end # Remember that Periodic BCs also fill halo points! -@inline fill_halo_size(c::OffsetArray, ::WEB, idx, ::PBC, args...) = @inbounds size(c)[[2, 3]] -@inline fill_halo_size(c::OffsetArray, ::SNB, idx, ::PBC, args...) = @inbounds size(c)[[1, 3]] -@inline fill_halo_size(c::OffsetArray, ::TBB, idx, ::PBC, args...) = @inbounds size(c)[[1, 2]] -@inline fill_halo_size(c::OffsetArray, ::WEB, ::Tuple{<:Any, <:Colon, <:Colon}, ::PBC, args...) = @inbounds size(c)[[2, 3]] -@inline fill_halo_size(c::OffsetArray, ::SNB, ::Tuple{<:Colon, <:Any, <:Colon}, ::PBC, args...) = @inbounds size(c)[[1, 3]] -@inline fill_halo_size(c::OffsetArray, ::TBB, ::Tuple{<:Colon, <:Colon, <:Any}, ::PBC, args...) = @inbounds size(c)[[1, 2]] +@inline fill_halo_size(c::OffsetArray, ::WEB, idx, ::PBC, args...) = tuple(size(c, 2), size(c, 3)) +@inline fill_halo_size(c::OffsetArray, ::SNB, idx, ::PBC, args...) = tuple(size(c, 1), size(c, 3)) +@inline fill_halo_size(c::OffsetArray, ::TBB, idx, ::PBC, args...) = tuple(size(c, 1), size(c, 2)) + +@inline function fill_halo_size(c::OffsetArray, ::WEB, ::Tuple{<:Any, <:Colon, <:Colon}, ::PBC, args...) + _, Cy, Cz = size(c) + return (Cy, Cz) +end + +@inline function fill_halo_size(c::OffsetArray, ::SNB, ::Tuple{<:Colon, <:Any, <:Colon}, ::PBC, args...) + Cx, _, Cz = size(c) + return (Cx, Cz) +end + +@inline function fill_halo_size(c::OffsetArray, ::TBB, ::Tuple{<:Colon, <:Colon, <:Any}, ::PBC, args...) + Cx, Cy, _ = size(c) + return (Cx, Cy) +end # The offsets are non-zero only if the indices are not Colon @inline fill_halo_offset(::Symbol, args...) = (0, 0) diff --git a/src/Fields/abstract_field.jl b/src/Fields/abstract_field.jl index 41741ea8ef..f4d4fde9d8 100644 --- a/src/Fields/abstract_field.jl +++ b/src/Fields/abstract_field.jl @@ -12,7 +12,7 @@ import Base: minimum, maximum, extrema import Oceananigans: location, instantiated_location import Oceananigans.Architectures: architecture import Oceananigans.Grids: interior_x_indices, interior_y_indices, interior_z_indices -import Oceananigans.Grids: total_size, topology, nodes, xnodes, ynodes, znodes, xnode, ynode, znode +import Oceananigans.Grids: total_size, topology, nodes, xnodes, ynodes, znodes, node, xnode, ynode, znode import Oceananigans.Utils: datatuple const ArchOrNothing = Union{AbstractArchitecture, Nothing} @@ -71,6 +71,7 @@ interior(f::AbstractField) = f ##### Coordinates of fields ##### +@propagate_inbounds node(i, j, k, ψ::AbstractField) = node(i, j, k, ψ.grid, instantiated_location(ψ)...) @propagate_inbounds xnode(i, j, k, ψ::AbstractField) = xnode(i, j, k, ψ.grid, instantiated_location(ψ)...) @propagate_inbounds ynode(i, j, k, ψ::AbstractField) = ynode(i, j, k, ψ.grid, instantiated_location(ψ)...) @propagate_inbounds znode(i, j, k, ψ::AbstractField) = znode(i, j, k, ψ.grid, instantiated_location(ψ)...) diff --git a/src/Fields/field.jl b/src/Fields/field.jl index 6de12ae4e7..9337523cea 100644 --- a/src/Fields/field.jl +++ b/src/Fields/field.jl @@ -488,11 +488,15 @@ const XYReducedField = Field{Nothing, Nothing, <:Any} const XYZReducedField = Field{Nothing, Nothing, Nothing} -const ReducedField = Union{XReducedField, YReducedField, ZReducedField, - YZReducedField, XZReducedField, XYReducedField, +const ReducedField = Union{XReducedField, + YReducedField, + ZReducedField, + YZReducedField, + XZReducedField, + XYReducedField, XYZReducedField} -reduced_dimensions(field::Field) = () +reduced_dimensions(field::Field) = () reduced_dimensions(field::XReducedField) = tuple(1) reduced_dimensions(field::YReducedField) = tuple(2) reduced_dimensions(field::ZReducedField) = tuple(3) @@ -551,6 +555,24 @@ end ##### Field reductions ##### +const XReducedAbstractField = AbstractField{Nothing} +const YReducedAbstractField = AbstractField{<:Any, Nothing} +const ZReducedAbstractField = AbstractField{<:Any, <:Any, Nothing} + +const YZReducedAbstractField = AbstractField{<:Any, Nothing, Nothing} +const XZReducedAbstractField = AbstractField{Nothing, <:Any, Nothing} +const XYReducedAbstractField = AbstractField{Nothing, Nothing, <:Any} + +const XYZReducedAbstractField = AbstractField{Nothing, Nothing, Nothing} + +const ReducedAbstractField = Union{XReducedAbstractField, + YReducedAbstractField, + ZReducedAbstractField, + YZReducedAbstractField, + XZReducedAbstractField, + XYReducedAbstractField, + XYZReducedAbstractField} + # TODO: needs test Statistics.dot(a::Field, b::Field) = mapreduce((x, y) -> x * y, +, interior(a), interior(b)) @@ -563,12 +585,12 @@ const MinimumReduction = typeof(Base.minimum!) const AllReduction = typeof(Base.all!) const AnyReduction = typeof(Base.any!) -initialize_reduced_field!(::SumReduction, f, r::ReducedField, c) = Base.initarray!(interior(r), f, Base.add_sum, true, interior(c)) -initialize_reduced_field!(::ProdReduction, f, r::ReducedField, c) = Base.initarray!(interior(r), f, Base.mul_prod, true, interior(c)) -initialize_reduced_field!(::AllReduction, f, r::ReducedField, c) = Base.initarray!(interior(r), f, &, true, interior(c)) -initialize_reduced_field!(::AnyReduction, f, r::ReducedField, c) = Base.initarray!(interior(r), f, |, true, interior(c)) -initialize_reduced_field!(::MaximumReduction, f, r::ReducedField, c) = Base.mapfirst!(f, interior(r), interior(c)) -initialize_reduced_field!(::MinimumReduction, f, r::ReducedField, c) = Base.mapfirst!(f, interior(r), interior(c)) +initialize_reduced_field!(::SumReduction, f, r::ReducedAbstractField, c) = Base.initarray!(interior(r), f, Base.add_sum, true, interior(c)) +initialize_reduced_field!(::ProdReduction, f, r::ReducedAbstractField, c) = Base.initarray!(interior(r), f, Base.mul_prod, true, interior(c)) +initialize_reduced_field!(::AllReduction, f, r::ReducedAbstractField, c) = Base.initarray!(interior(r), f, &, true, interior(c)) +initialize_reduced_field!(::AnyReduction, f, r::ReducedAbstractField, c) = Base.initarray!(interior(r), f, |, true, interior(c)) +initialize_reduced_field!(::MaximumReduction, f, r::ReducedAbstractField, c) = Base.mapfirst!(f, interior(r), interior(c)) +initialize_reduced_field!(::MinimumReduction, f, r::ReducedAbstractField, c) = Base.mapfirst!(f, interior(r), interior(c)) filltype(f, c) = eltype(c) filltype(::Union{AllReduction, AnyReduction}, grid) = Bool @@ -624,7 +646,7 @@ for reduction in (:sum, :maximum, :minimum, :all, :any, :prod) # In-place function Base.$(reduction!)(f::Function, - r::ReducedField, + r::ReducedAbstractField, a::AbstractField; condition = nothing, mask = get_neutral_mask(Base.$(reduction!)), @@ -636,7 +658,7 @@ for reduction in (:sum, :maximum, :minimum, :all, :any, :prod) kwargs...) end - function Base.$(reduction!)(r::ReducedField, + function Base.$(reduction!)(r::ReducedAbstractField, a::AbstractField; condition = nothing, mask = get_neutral_mask(Base.$(reduction!)), @@ -689,7 +711,7 @@ end Statistics.mean(f::Function, c::AbstractField; condition = nothing, dims=:) = Statistics._mean(f, c, dims; condition) Statistics.mean(c::AbstractField; condition = nothing, dims=:) = Statistics._mean(identity, c, dims; condition) -function Statistics.mean!(f::Function, r::ReducedField, a::AbstractField; condition = nothing, mask = 0) +function Statistics.mean!(f::Function, r::ReducedAbstractField, a::AbstractField; condition = nothing, mask = 0) sum!(f, r, a; condition, mask, init=true) dims = reduced_dimension(location(r)) n = conditional_length(condition_operand(f, a, condition, mask), dims) @@ -697,7 +719,7 @@ function Statistics.mean!(f::Function, r::ReducedField, a::AbstractField; condit return r end -Statistics.mean!(r::ReducedField, a::AbstractArray; kwargs...) = Statistics.mean!(identity, r, a; kwargs...) +Statistics.mean!(r::ReducedAbstractField, a::AbstractArray; kwargs...) = Statistics.mean!(identity, r, a; kwargs...) function Statistics.norm(a::AbstractField; condition = nothing) r = zeros(a.grid, 1) diff --git a/src/Fields/field_tuples.jl b/src/Fields/field_tuples.jl index 1b28b2ee1c..ff426b1c68 100644 --- a/src/Fields/field_tuples.jl +++ b/src/Fields/field_tuples.jl @@ -4,6 +4,8 @@ using Oceananigans.BoundaryConditions: FieldBoundaryConditions, regularize_field ##### `fill_halo_regions!` for tuples of `Field` ##### +@inline flattened_unique_values(::Tuple{}) = tuple() + """ flattened_unique_values(a::NamedTuple) diff --git a/src/Fields/interpolate.jl b/src/Fields/interpolate.jl index ee75df3908..67b828e975 100644 --- a/src/Fields/interpolate.jl +++ b/src/Fields/interpolate.jl @@ -55,8 +55,9 @@ end end ##### -##### Disclaimer! interpolation on LatitudeLongitudeGrid assumes a thin shell (i.e. no curvature effects when interpolating) -##### Use other methods if a more accurate interpolation is required +##### Note: interpolation on LatitudeLongitudeGrid assumes a thin shell +##### (i.e. curvature effects are not incorporated when interpolating). +##### Use other methods if a more accurate interpolation is required. ##### @inline fractional_x_index(x, locs, grid::XFlatGrid) = zero(grid) @@ -142,7 +143,7 @@ end end """ - fractional_indices(x, y, z, loc, grid) + fractional_indices(x, y, z, grid, loc...) Convert the coordinates `(x, y, z)` to _fractional_ indices on a regular rectilinear grid located at `loc`, where `loc` is a 3-tuple of `Center` and `Face`. Fractional indices are @@ -267,20 +268,20 @@ end @inline ϕ₇(ξ, η, ζ) = ξ * η * (1 - ζ) @inline ϕ₈(ξ, η, ζ) = ξ * η * ζ -@inline function _interpolate(data, ix, iy, iz) +@inline function _interpolate(data, ix, iy, iz, in...) # Unpack the "interpolators" i⁻, i⁺, ξ = ix j⁻, j⁺, η = iy k⁻, k⁺, ζ = iz - return @inbounds ϕ₁(ξ, η, ζ) * data[i⁻, j⁻, k⁻] + - ϕ₂(ξ, η, ζ) * data[i⁻, j⁻, k⁺] + - ϕ₃(ξ, η, ζ) * data[i⁻, j⁺, k⁻] + - ϕ₄(ξ, η, ζ) * data[i⁻, j⁺, k⁺] + - ϕ₅(ξ, η, ζ) * data[i⁺, j⁻, k⁻] + - ϕ₆(ξ, η, ζ) * data[i⁺, j⁻, k⁺] + - ϕ₇(ξ, η, ζ) * data[i⁺, j⁺, k⁻] + - ϕ₈(ξ, η, ζ) * data[i⁺, j⁺, k⁺] + return @inbounds ϕ₁(ξ, η, ζ) * getindex(data, i⁻, j⁻, k⁻, in...) + + ϕ₂(ξ, η, ζ) * getindex(data, i⁻, j⁻, k⁺, in...) + + ϕ₃(ξ, η, ζ) * getindex(data, i⁻, j⁺, k⁻, in...) + + ϕ₄(ξ, η, ζ) * getindex(data, i⁻, j⁺, k⁺, in...) + + ϕ₅(ξ, η, ζ) * getindex(data, i⁺, j⁻, k⁻, in...) + + ϕ₆(ξ, η, ζ) * getindex(data, i⁺, j⁻, k⁺, in...) + + ϕ₇(ξ, η, ζ) * getindex(data, i⁺, j⁺, k⁻, in...) + + ϕ₈(ξ, η, ζ) * getindex(data, i⁺, j⁺, k⁺, in...) end """ diff --git a/src/Fields/show_fields.jl b/src/Fields/show_fields.jl index d426e54537..d00973c2f5 100644 --- a/src/Fields/show_fields.jl +++ b/src/Fields/show_fields.jl @@ -27,9 +27,9 @@ function Base.summary(field::Field) return string(prefix, suffix) end -data_summary(field) = string("max=", prettysummary(maximum(field)), ", ", - "min=", prettysummary(minimum(field)), ", ", - "mean=", prettysummary(mean(field))) +data_summary(data) = string("max=", prettysummary(maximum(data)), ", ", + "min=", prettysummary(minimum(data)), ", ", + "mean=", prettysummary(mean(data))) indices_summary(field) = replace(string(field.indices), "Colon()"=> ":") diff --git a/src/Grids/latitude_longitude_grid.jl b/src/Grids/latitude_longitude_grid.jl index eb0855dcbc..f6e2d7a887 100644 --- a/src/Grids/latitude_longitude_grid.jl +++ b/src/Grids/latitude_longitude_grid.jl @@ -437,10 +437,10 @@ end ##### Kernels that precompute the z- and x-metric ##### -@inline metric_worksize(grid::LatitudeLongitudeGrid) = (length(grid.Δλᶜᵃᵃ), length(grid.φᵃᶜᵃ) - 1) +@inline metric_worksize(grid::LatitudeLongitudeGrid) = (length(grid.Δλᶜᵃᵃ), length(grid.φᵃᶠᵃ) - 2) @inline metric_workgroup(grid::LatitudeLongitudeGrid) = (16, 16) -@inline metric_worksize(grid::XRegularLLG) = length(grid.φᵃᶜᵃ) - 1 +@inline metric_worksize(grid::XRegularLLG) = length(grid.φᵃᶠᵃ) - 2 @inline metric_workgroup(grid::XRegularLLG) = 16 function precompute_curvilinear_metrics!(grid, Δxᶠᶜ, Δxᶜᶠ, Δxᶠᶠ, Δxᶜᶜ, Azᶠᶜ, Azᶜᶠ, Azᶠᶠ, Azᶜᶜ) diff --git a/src/Models/Models.jl b/src/Models/Models.jl index 44bf3308e4..f7c871287b 100644 --- a/src/Models/Models.jl +++ b/src/Models/Models.jl @@ -13,7 +13,7 @@ using Oceananigans: AbstractModel, fields, prognostic_fields using Oceananigans.Grids: AbstractGrid, halo_size, inflate_halo_size using Oceananigans.TimeSteppers: AbstractTimeStepper, Clock using Oceananigans.Utils: Time -using Oceananigans.Fields: AbstractField, Field, flattened_unique_values +using Oceananigans.Fields: AbstractField, Field, flattened_unique_values, boundary_conditions using Oceananigans.AbstractOperations: AbstractOperation using Oceananigans.Advection: AbstractAdvectionScheme, CenteredSecondOrder, VectorInvariant @@ -21,7 +21,7 @@ import Oceananigans: initialize! import Oceananigans.Architectures: architecture import Oceananigans.TimeSteppers: reset! -using Oceananigans.OutputReaders: update_field_time_series!, extract_field_timeseries +using Oceananigans.OutputReaders: update_field_time_series!, extract_field_time_series # A prototype interface for AbstractModel. # @@ -114,6 +114,19 @@ const OceananigansModels = Union{HydrostaticFreeSurfaceModel, NonhydrostaticModel, ShallowWaterModel} +""" + possible_field_time_series(model::HydrostaticFreeSurfaceModel) + +Return a `Tuple` containing properties of and `OceananigansModel` that could contain `FieldTimeSeries`. +""" +function possible_field_time_series(model::OceananigansModels) + forcing = model.forcing + model_fields = fields(model) + # Note: we may need to include other objects in the tuple below, + # such as model.diffusivity_fields + return tuple(model_fields, forcing) +end + # Update _all_ `FieldTimeSeries`es in an `OceananigansModel`. # Extract `FieldTimeSeries` from all property names that might contain a `FieldTimeSeries` # Flatten the resulting tuple by extracting unique values and set! them to the @@ -122,8 +135,7 @@ function update_model_field_time_series!(model::OceananigansModels, clock::Clock time = Time(clock.time) possible_fts = possible_field_time_series(model) - - time_series_tuple = extract_field_timeseries(possible_fts) + time_series_tuple = extract_field_time_series(possible_fts) time_series_tuple = flattened_unique_values(time_series_tuple) for fts in time_series_tuple @@ -132,14 +144,7 @@ function update_model_field_time_series!(model::OceananigansModels, clock::Clock return nothing end - -""" - possible_field_time_series(model::HydrostaticFreeSurfaceModel) - -Return a `Tuple` containing properties of and `OceananigansModel` that could contain `FieldTimeSeries`. -""" -possible_field_time_series(model::OceananigansModels) = tuple(fields(model), model.forcing, model.diffusivity_fields) - + import Oceananigans.TimeSteppers: reset! function reset!(model::OceananigansModels) diff --git a/src/Oceananigans.jl b/src/Oceananigans.jl index b296bde813..6e74ae3180 100644 --- a/src/Oceananigans.jl +++ b/src/Oceananigans.jl @@ -103,7 +103,7 @@ export # Output writers NetCDFOutputWriter, JLD2OutputWriter, Checkpointer, TimeInterval, IterationInterval, AveragedTimeInterval, SpecifiedTimes, - AndSchedule, OrSchedule, + AndSchedule, OrSchedule, written_names, # Output readers FieldTimeSeries, FieldDataset, InMemory, OnDisk, diff --git a/src/OutputReaders/OutputReaders.jl b/src/OutputReaders/OutputReaders.jl index 1542e4ca71..b81622b584 100644 --- a/src/OutputReaders/OutputReaders.jl +++ b/src/OutputReaders/OutputReaders.jl @@ -1,35 +1,18 @@ module OutputReaders +export FieldDataset +export FieldTimeSeries export InMemory, OnDisk -export FieldTimeSeries, FieldDataset - -abstract type AbstractDataBackend end - -mutable struct InMemory{I} <: AbstractDataBackend - index_range :: I -end - -function InMemory(; chunk_size = Colon()) - index_range = if chunk_size isa Colon - Colon() - else - UnitRange(1, chunk_size) - end - - return InMemory(index_range) -end - -struct OnDisk <: AbstractDataBackend end - -# validate_backend(::InMemory{Nothing}, data) = InMemory(collect(1:size(data, 4))) -# validate_backend(::OnDisk, data) = OnDisk() -# validate_backend(in_memory::InMemory, data) = in_memory +export Cyclical, Linear, Clamp include("field_time_series.jl") -include("memory_allocated_field_time_series.jl") -include("on_disk_field_time_series.jl") -include("gpu_adapted_field_time_series.jl") -include("update_field_time_series.jl") +include("field_time_series_indexing.jl") +include("set_field_time_series.jl") +include("field_time_series_reductions.jl") +include("show_field_time_series.jl") +include("extract_field_time_series.jl") + +# Experimental include("field_dataset.jl") end # module diff --git a/src/OutputReaders/adapted_field_time_series.jl b/src/OutputReaders/adapted_field_time_series.jl deleted file mode 100644 index b91632a270..0000000000 --- a/src/OutputReaders/adapted_field_time_series.jl +++ /dev/null @@ -1,44 +0,0 @@ -using Adapt - -struct GPUAdaptedFieldTimeSeries{LX, LY, LZ, T, D, χ} <: AbstractArray{T, 4} - data :: D - times :: χ - - function GPUAdaptedFieldTimeSeries{LX, LY, LZ, T}(data::D, - times::χ) where {T, LX, LY, LZ, D, χ} - return new{LX, LY, LZ, T, D, χ}(data, backend, times) - end -end - -Adapt.adapt_structure(to, fts::FieldTimeSeries{LX, LY, LZ}) where {LX, LY, LZ} = - GPUAdaptedFieldTimeSeries{LX, LY, LZ, eltype(fts.grid)}(adapt(to, fts.data), - adapt(to, fts.times)) - -@propagate_inbounds Base.lastindex(fts::GPUAdaptedFieldTimeSeries) = lastindex(fts.data) -@propagate_inbounds Base.lastindex(fts::GPUAdaptedFieldTimeSeries, dim) = lastindex(fts.data, dim) - -const XYFTS = FieldTimeSeries{<:Any, <:Any, Nothing} -const XZFTS = FieldTimeSeries{<:Any, Nothing, <:Any} -const YZFTS = FieldTimeSeries{Nothing, <:Any, <:Any} - -const XYGPUFTS = GPUAdaptedFieldTimeSeries{<:Any, <:Any, Nothing} -const XZGPUFTS = GPUAdaptedFieldTimeSeries{<:Any, Nothing, <:Any} -const YZGPUFTS = GPUAdaptedFieldTimeSeries{Nothing, <:Any, <:Any} - -# Handle `Nothing` locations to allow `getbc` to work -@propagate_inbounds Base.getindex(fts::XYGPUFTS, i::Int, j::Int, n) = fts[i, j, 1, n] -@propagate_inbounds Base.getindex(fts::XZGPUFTS, i::Int, k::Int, n) = fts[i, 1, k, n] -@propagate_inbounds Base.getindex(fts::YZGPUFTS, j::Int, k::Int, n) = fts[1, j, k, n] - -@propagate_inbounds Base.getindex(fts::XYFTS, i::Int, j::Int, n) = fts[i, j, 1, n] -@propagate_inbounds Base.getindex(fts::XZFTS, i::Int, k::Int, n) = fts[i, 1, k, n] -@propagate_inbounds Base.getindex(fts::YZFTS, j::Int, k::Int, n) = fts[1, j, k, n] - -# Only `getindex` for GPUAdaptedFieldTimeSeries, no need to `setindex` -Base.getindex(fts::GPUAdaptedFieldTimeSeries, i::Int, j::Int, k::Int, n::Int) = @inbounds fts.data[i, j, k, n] - -# Extend Linear time interpolation for GPUAdaptedFieldTimeSeries -@inline Base.getindex(fts::GPUAdaptedFieldTimeSeries, i::Int, j::Int, k::Int, time_index::Time) = - interpolating_get_index(fts, i, j, k, time_index) - - diff --git a/src/OutputReaders/extract_field_time_series.jl b/src/OutputReaders/extract_field_time_series.jl new file mode 100644 index 0000000000..c4fd943083 --- /dev/null +++ b/src/OutputReaders/extract_field_time_series.jl @@ -0,0 +1,38 @@ +using Oceananigans.AbstractOperations: AbstractOperation +using Oceananigans.Fields: flattened_unique_values + +##### +##### Utility for "extracting" FieldTimeSeries from large nested objects (eg models) +##### + +extract_field_time_series(t1, tn...) = extract_field_time_series(tuple(t1, tn...)) + +# Utility used to extract field time series from a type through recursion +function extract_field_time_series(t) + prop = propertynames(t) + if isempty(prop) + return nothing + end + + extracted = Tuple(extract_field_time_series(getproperty(t, p)) for p in prop) + flattened = flattened_unique_values(extracted) + + return flattened +end + +# Termination (move all here when we switch the code up) +extract_field_time_series(f::FieldTimeSeries) = f + +# For types that do not contain `FieldTimeSeries`, halt the recursion +CannotPossiblyContainFTS = (:Number, :AbstractArray) + +for T in CannotPossiblyContainFTS + @eval extract_field_time_series(::$T) = nothing +end + +# Special recursion rules for `Tuple` and `Field` types +extract_field_time_series(t::AbstractField) = Tuple(extract_field_time_series(getproperty(t, p)) for p in propertynames(t)) +extract_field_time_series(t::AbstractOperation) = Tuple(extract_field_time_series(getproperty(t, p)) for p in propertynames(t)) + +extract_field_time_series(t::Union{Tuple, NamedTuple}) = map(extract_field_time_series, t) + diff --git a/src/OutputReaders/field_time_series.jl b/src/OutputReaders/field_time_series.jl index ffc0d059fe..82aa537406 100644 --- a/src/OutputReaders/field_time_series.jl +++ b/src/OutputReaders/field_time_series.jl @@ -3,6 +3,7 @@ using Base: @propagate_inbounds using OffsetArrays using Statistics using JLD2 +using Adapt using Dates: AbstractTime using KernelAbstractions: @kernel, @index @@ -12,16 +13,206 @@ using Oceananigans.Grids using Oceananigans.Fields using Oceananigans.Grids: topology, total_size, interior_parent_indices, parent_index_range -using Oceananigans.Fields: show_location, interior_view_indices, data_summary, reduced_location, - index_binary_search, indices_summary, boundary_conditions + +using Oceananigans.Fields: interior_view_indices, index_binary_search, + indices_summary, boundary_conditions + using Oceananigans.Units: Time using Oceananigans.Utils: launch! import Oceananigans.Architectures: architecture -import Oceananigans.BoundaryConditions: fill_halo_regions! +import Oceananigans.BoundaryConditions: fill_halo_regions!, BoundaryCondition, getbc import Oceananigans.Fields: Field, set!, interior, indices, interpolate! -struct FieldTimeSeries{LX, LY, LZ, K, I, D, G, T, B, χ, P, N} <: AbstractField{LX, LY, LZ, G, T, 4} +##### +##### Data backends for FieldTimeSeries +##### + +abstract type AbstractDataBackend end +abstract type AbstractInMemoryBackend{S} end + +struct InMemory{S} <: AbstractInMemoryBackend{S} + start :: S + length :: S +end + +""" + InMemory(length=nothing) + +Return a `backend` for `FieldTimeSeries` that stores `size` +fields in memory. The default `size = nothing` stores all fields in memory. +""" +function InMemory(length::Int) + length < 2 && throw(ArgumentError("InMemory `length` must be 2 or greater.")) + return InMemory(1, length) +end + +InMemory() = InMemory(nothing, nothing) + +const TotallyInMemory = AbstractInMemoryBackend{Nothing} +const PartlyInMemory = AbstractInMemoryBackend{Int} + +Base.summary(backend::PartlyInMemory) = string("InMemory(", backend.start, ", ", length(backend), ")") +Base.summary(backend::TotallyInMemory) = "InMemory()" + +new_backend(::InMemory, start, length) = InMemory(start, length) + +""" + OnDisk() + +Return a lazy `backend` for `FieldTimeSeries` that keeps data +on disk, only loading it as requested by indexing into the +`FieldTimeSeries`. +""" +struct OnDisk <: AbstractDataBackend end + +##### +##### Time indexing modes for FieldTimeSeries +##### + +""" + Cyclical(period=nothing) + +Specifies cyclical FieldTimeSeries linear Time extrapolation. If +`period` is not specified, it is inferred from the `fts::FieldTimeSeries` via + +```julia +t = fts.times +Δt = t[end] - t[end-1] +period = t[end] - t[1] + Δt +``` +""" +struct Cyclical{FT} + period :: FT +end + +Cyclical() = Cyclical(nothing) + +""" + Linear() + +Specifies FieldTimeSeries linear Time extrapolation. +""" +struct Linear end + +""" + Clamp() + +Specifies FieldTimeSeries Time extrapolation that returns data from the nearest value. +""" +struct Clamp end # clamp to nearest value + +# Return the memory index associated with time index `n`. +# For example, if all data is in memory this is simply `n`. +# If only part of the data is in memory, then this is `n - n₀ + 1`, +# where n₀ is the time-index of the first memory index. +# +# TODO: do we need a special `memory_index` implementation for `Clamp` as well? +# For example, it may be better to get a bounds error. + +# Totally in memory stuff +@inline time_index(backend, ti, Nt, m) = m +@inline memory_index(backend, ti, Nt, n) = n +@inline memory_index(backend::TotallyInMemory, ::Cyclical, Nt, n) = mod1(n, Nt) +@inline memory_index(backend::TotallyInMemory, ::Clamp, Nt, n) = clamp(n, 1, Nt) + +# Partly in memory stuff +@inline shift_index(n, n₀) = n - (n₀ - 1) +@inline reverse_index(m, n₀) = m + n₀ - 1 + +@inline memory_index(backend::PartlyInMemory, ::Linear, Nt, n) = shift_index(n, backend.start) + +@inline function memory_index(backend::PartlyInMemory, ::Clamp, Nt, n) + n̂ = clamp(n, 1, Nt) + m = shift_index(n̂, backend.start) + return m +end + +""" + time_index(backend::PartlyInMemory, time_indexing, Nt, m) + +Compute the time index of a snapshot currently stored at the memory index `m`, +given `backend`, `time_indexing`, and number of times `Nt`. +""" +@inline time_index(backend::PartlyInMemory, ::Union{Clamp, Linear}, Nt, m) = + reverse_index(m, backend.start) + +""" + memory_index(backend::PartlyInMemory, time_indexing, Nt, n) + +Compute the current index of a snapshot in memory that has +the time index `n`, given `backend`, `time_indexing`, and number of times `Nt`. + +Example +======= + +For `backend::PartlyInMemory` and `time_indexing::Cyclical`: + +# Simple shifting example +```julia +Nt = 5 +backend = InMemory(2, 3) # so we have (2, 3, 4) +n = 4 # so m̃ = 3 +m = 4 - (2 - 1) # = 3 +m̃ = mod1(3, 5) # = 3 ✓ +``` + +# Shifting + wrapping example +```julia +Nt = 5 +backend = InMemory(4, 3) # so we have (4, 5, 1) +n = 1 # so, the right answer is m̃ = 3 +m = 1 - (4 - 1) # = -2 +m̃ = mod1(-2, 5) # = 3 ✓ +``` + +# Another shifting + wrapping example +```julia +Nt = 5 +backend = InMemory(5, 3) # so we have (5, 1, 2) +n = 11 # so, the right answer is m̃ = 2 +m = 11 - (5 - 1) # = 7 +m̃ = mod1(7, 5) # = 2 ✓ +``` +""" +@inline function memory_index(backend::PartlyInMemory, ::Cyclical, Nt, n) + m = shift_index(n, backend.start) + m̃ = mod1(m, Nt) # wrap index + return m̃ +end + +@inline function time_index(backend::PartlyInMemory, ::Cyclical, Nt, m) + n = reverse_index(m, backend.start) + ñ = mod1(n, Nt) # wrap index + return ñ +end + +""" + time_indices(backend, time_indexing, Nt) + +Return a collection of the time indices that are currently in memory. +If `backend::TotallyInMemory` then return `1:length(times)`. +""" +function time_indices(backend::PartlyInMemory, time_indexing, Nt) + St = length(backend) + n₀ = backend.start + + time_indices = ntuple(St) do m + time_index(backend, time_indexing, Nt, m) + end + + return time_indices +end + +time_indices(::TotallyInMemory, time_indexing, Nt) = 1:Nt + +Base.length(backend::PartlyInMemory) = backend.length + +##### +##### FieldTimeSeries +##### + +mutable struct FieldTimeSeries{LX, LY, LZ, TI, K, I, D, G, ET, B, χ, P, N} <: AbstractField{LX, LY, LZ, G, ET, 4} data :: D grid :: G backend :: K @@ -30,28 +221,108 @@ struct FieldTimeSeries{LX, LY, LZ, K, I, D, G, T, B, χ, P, N} <: AbstractField{ times :: χ path :: P name :: N - + time_indexing :: TI + function FieldTimeSeries{LX, LY, LZ}(data::D, grid::G, backend::K, bcs::B, indices::I, - times::χ, - path::P, - name::N) where {LX, LY, LZ, K, D, G, B, χ, I, P, N} - T = eltype(data) - return new{LX, LY, LZ, K, I, D, G, T, B, χ, P, N}(data, grid, backend, bcs, - indices, times, path, name) + times, + path, + name, + time_indexing) where {LX, LY, LZ, K, D, G, B, I} + + ET = eltype(data) + + # Check consistency between `backend` and `times`. + if backend isa PartlyInMemory && backend.length > length(times) + throw(ArgumentError("`backend.length` cannot be greater than `length(times)`.")) + end + + if times isa AbstractArray + # Try to convert to a range, cuz + time_range = range(first(times), last(times), length=length(times)) + if all(time_range .≈ times) # good enough for most + times = time_range + end + + times = arch_array(architecture(grid), times) + end + + if time_indexing isa Cyclical{Nothing} # we have to infer the period + Δt = times[end] - times[end-1] + period = times[end] - times[1] + Δt + time_indexing = Cyclical(period) + end + + χ = typeof(times) + TI = typeof(time_indexing) + P = typeof(path) + N = typeof(name) + + return new{LX, LY, LZ, TI, K, I, D, G, ET, B, χ, P, N}(data, grid, backend, bcs, + indices, times, path, name, + time_indexing) end end -architecture(fts::FieldTimeSeries) = architecture(fts.grid) +##### +##### Minimal implementation of FieldTimeSeries for use in GPU kernels +##### +##### Supports reduced locations + time-interpolation / extrapolation +##### -const TotallyInMemoryFieldTimeSeries{LX, LY, LZ} = FieldTimeSeries{LX, LY, LZ, <:InMemory{Colon}} -const InMemoryFieldTimeSeries{LX, LY, LZ} = FieldTimeSeries{LX, LY, LZ, <:InMemory} -const OnDiskFieldTimeSeries{LX, LY, LZ} = FieldTimeSeries{LX, LY, LZ, <:OnDisk} +struct GPUAdaptedFieldTimeSeries{LX, LY, LZ, TI, K, ET, D, χ} <: AbstractArray{ET, 4} + data :: D + times :: χ + backend :: K + time_indexing :: TI + + function GPUAdaptedFieldTimeSeries{LX, LY, LZ}(data::D, + times::χ, + backend::K, + time_indexing::TI) where {LX, LY, LZ, TI, K, D, χ} -struct UnspecifiedBoundaryConditions end + ET = eltype(data) + return new{LX, LY, LZ, TI, K, ET, D, χ}(data, times, backend, time_indexing) + end +end + +function Adapt.adapt_structure(to, fts::FieldTimeSeries) + LX, LY, LZ = location(fts) + return GPUAdaptedFieldTimeSeries{LX, LY, LZ}(adapt(to, fts.data), + adapt(to, fts.times), + adapt(to, fts.backend), + adapt(to, fts.time_indexing)) +end + +const FTS{LX, LY, LZ, TI, K} = FieldTimeSeries{LX, LY, LZ, TI, K} where {LX, LY, LZ, TI, K} +const GPUFTS{LX, LY, LZ, TI, K} = GPUAdaptedFieldTimeSeries{LX, LY, LZ, TI, K} where {LX, LY, LZ, TI, K} + +const FlavorOfFTS{LX, LY, LZ, TI, K} = Union{GPUFTS{LX, LY, LZ, TI, K}, + FTS{LX, LY, LZ, TI, K}} where {LX, LY, LZ, TI, K} + +const InMemoryFTS = FlavorOfFTS{<:Any, <:Any, <:Any, <:Any, <:AbstractInMemoryBackend} +const OnDiskFTS = FlavorOfFTS{<:Any, <:Any, <:Any, <:Any, <:OnDisk} +const TotallyInMemoryFTS = FlavorOfFTS{<:Any, <:Any, <:Any, <:Any, <:TotallyInMemory} +const PartlyInMemoryFTS = FlavorOfFTS{<:Any, <:Any, <:Any, <:Any, <:PartlyInMemory} + +const CyclicalFTS{K} = FlavorOfFTS{<:Any, <:Any, <:Any, <:Cyclical, K} where K +const LinearFTS{K} = FlavorOfFTS{<:Any, <:Any, <:Any, <:Linear, K} where K +const ClampFTS{K} = FlavorOfFTS{<:Any, <:Any, <:Any, <:Clamp, K} where K + +const CyclicalChunkedFTS = CyclicalFTS{<:PartlyInMemory} + +architecture(fts::FieldTimeSeries) = architecture(fts.grid) +time_indices(fts) = time_indices(fts.backend, fts.time_indexing, length(fts.times)) + +@inline function memory_index(fts, n) + backend = fts.backend + ti = fts.time_indexing + Nt = length(fts.times) + return memory_index(backend, ti, Nt, n) +end ##### ##### Constructors @@ -59,16 +330,32 @@ struct UnspecifiedBoundaryConditions end instantiate(T::Type) = T() -function FieldTimeSeries(loc, grid, times; +new_data(FT, grid, loc, indices, ::Nothing) = nothing + +function new_data(FT, grid, loc, indices, Nt::Int) + space_size = total_size(grid, loc, indices) + underlying_data = zeros(FT, architecture(grid), space_size..., Nt) + data = offset_data(underlying_data, grid, loc, indices) + return data +end + +time_indices_length(backend, times) = throw(ArgumentError("$backend is not a supported backend!")) +time_indices_length(::TotallyInMemory, times) = length(times) +time_indices_length(backend::PartlyInMemory, times) = length(backend) +time_indices_length(::OnDisk, times) = nothing + +function FieldTimeSeries(loc, grid, times=(); indices = (:, :, :), backend = InMemory(), path = nothing, name = nothing, + time_indexing = Linear(), boundary_conditions = nothing) LX, LY, LZ = loc - Nt = length(times) - data = new_data(eltype(grid), grid, loc, indices, Nt, backend) + + Nt = time_indices_length(backend, times) + data = new_data(eltype(grid), grid, loc, indices, Nt) if backend isa OnDisk isnothing(name) && isnothing(name) && @@ -77,13 +364,13 @@ function FieldTimeSeries(loc, grid, times; isnothing(path) && error(ArgumentError("Must provide the keyword argument `path` when `backend=OnDisk()`.")) isnothing(name) && error(ArgumentError("Must provide the keyword argument `name` when `backend=OnDisk()`.")) end - + return FieldTimeSeries{LX, LY, LZ}(data, grid, backend, boundary_conditions, - indices, times, path, name) + indices, times, path, name, time_indexing) end """ - FieldTimeSeries{LX, LY, LZ}(grid::AbstractGrid, times; kwargs...) where {LX, LY, LZ} = + FieldTimeSeries{LX, LY, LZ}(grid::AbstractGrid [, times=()]; kwargs...) Construct a `FieldTimeSeries` on `grid` and at `times`. @@ -91,16 +378,22 @@ Keyword arguments ================= - `indices`: spatial indices + - `backend`: backend, `InMemory(indices=Colon())` or `OnDisk()` + - `path`: path to data for `backend = OnDisk()` + - `name`: name of field for `backend = OnDisk()` """ -FieldTimeSeries{LX, LY, LZ}(grid::AbstractGrid, times; kwargs...) where {LX, LY, LZ} = - FieldTimeSeries((LX, LY, LZ), grid, times; kwargs...) +function FieldTimeSeries{LX, LY, LZ}(grid::AbstractGrid, times=(); kwargs...) where {LX, LY, LZ} + loc = (LX, LY, LZ) + return FieldTimeSeries(loc, grid, times; kwargs...) +end + +struct UnspecifiedBoundaryConditions end """ - FieldTimeSeries(path, name; - backend = InMemory(), + FieldTimeSeries(path, name, backend = InMemory(); grid = nothing, iterations = nothing, times = nothing) @@ -123,14 +416,13 @@ Keyword arguments comparison to recorded save times. Defaults to times associated with `iterations`. Takes precedence over `iterations` if `times` is specified. """ -FieldTimeSeries(path::String, name::String; backend=InMemory(), kw...) = - FieldTimeSeries(path, name, backend; kw...) - -function FieldTimeSeries(path::String, name::String, backend::AbstractDataBackend; +function FieldTimeSeries(path::String, name::String; + backend = InMemory(), architecture = nothing, grid = nothing, location = nothing, boundary_conditions = UnspecifiedBoundaryConditions(), + time_indexing = Linear(), iterations = nothing, times = nothing) @@ -153,9 +445,13 @@ function FieldTimeSeries(path::String, name::String, backend::AbstractDataBacken isnothing(grid) && (grid = file["serialized/grid"]) - # Default to CPU if neither architecture nor grid is specified - architecture = isnothing(architecture) ? - (isnothing(grid) ? CPU() : Architectures.architecture(grid)) : architecture + if isnothing(architecture) # determine architecture + if isnothing(grid) # go to default + architecture = CPU() + else # there's a grid, use that architecture + architecture = Architectures.architecture(grid) + end + end # This should be removed eventually... (4/5/2022) grid = try @@ -207,102 +503,19 @@ function FieldTimeSeries(path::String, name::String, backend::AbstractDataBacken close(file) LX, LY, LZ = Location + loc = map(instantiate, Location) - Nt = length(times) - data = new_data(eltype(grid), grid, loc, indices, Nt, backend) + Nt = time_indices_length(backend, times) + data = new_data(eltype(grid), grid, loc, indices, Nt) time_series = FieldTimeSeries{LX, LY, LZ}(data, grid, backend, boundary_conditions, - indices, times, path, name) + indices, times, path, name, time_indexing) set!(time_series, path, name) return time_series end -# Making FieldTimeSeries behave like Vector -Base.lastindex(fts::FieldTimeSeries) = size(fts, 4) -Base.firstindex(fts::FieldTimeSeries) = 1 -Base.length(fts::FieldTimeSeries) = size(fts, 4) - -# Linear time interpolation -function Base.getindex(fts::FieldTimeSeries, time_index::Time) - Ntimes = length(fts.times) - time = time_index.time - n₁, n₂ = index_binary_search(fts.times, time, Ntimes) - if n₁ == n₂ # no interpolation - return fts[n₁] - end - - # Calculate fractional index - n = @inbounds (n₂ - n₁) / (fts.times[n₂] - fts.times[n₁]) * (time - fts.times[n₁]) + n₁ - - # Make a Field representing a linear interpolation in time - time_interpolated_field = Field(fts[n₂] * (n - n₁) + fts[n₁] * (n₂ - n)) - - # Compute the field and return it - return compute!(time_interpolated_field) -end - -# Linear time interpolation, used by FieldTimeSeries and GPUAdaptedFieldTimeSeries -@inline function interpolating_get_index(fts, i, j, k, time_index) - Ntimes = length(fts.times) - time = time_index.time - n₁, n₂ = index_binary_search(fts.times, time, Ntimes) - - # fractional index - n = @inbounds (n₂ - n₁) / (fts.times[n₂] - fts.times[n₁]) * (time - fts.times[n₁]) + n₁ - interpolated_fts = getindex(fts, i, j, k, n₂) * (n - n₁) + getindex(fts, i, j, k, n₁) * (n₂ - n) - - # Don't interpolate if n = 0. - return ifelse(n₁ == n₂, getindex(fts, i, j, k, n₁), interpolated_fts) -end - -@inline Base.getindex(fts::FieldTimeSeries, i::Int, j::Int, k::Int, time_index::Time) = - interpolating_get_index(fts, i, j, k, time_index) - -function interpolate!(target_fts::FieldTimeSeries, source_fts::FieldTimeSeries) - - # TODO: support time-interpolation too. - # This requires extending the low-level Field interpolation utilities - # to support time-indexing. - target_fts.times == source_fts.times || - throw(ArgumentError("Cannot interpolate two FieldTimeSeries with different times.")) - - times = target_fts.times - Nt = length(times) - - target_grid = target_fts.grid - source_grid = source_fts.grid - - @assert architecture(target_grid) == architecture(source_grid) - arch = architecture(target_grid) - - # Make locations - source_location = Tuple(L() for L in location(source_fts)) - target_location = Tuple(L() for L in location(target_fts)) - - launch!(arch, target_grid, size(target_fts), - _interpolate_field_time_series!, - target_fts.data, target_grid, target_location, - source_fts.data, source_grid, source_location) - - fill_halo_regions!(target_fts) - - return nothing -end - -@kernel function _interpolate_field_time_series!(target_fts, target_grid, target_location, - source_fts, source_grid, source_location) - - # 4D index, cool! - i, j, k, n = @index(Global, NTuple) - - source_field = view(source_fts, :, :, :, n) - target_node = node(i, j, k, target_grid, target_location...) - - @inbounds target_fts[i, j, k, n] = interpolate(target_node, source_field, source_location, source_grid) -end - """ Field(location, path, name, iter; grid = nothing, @@ -319,201 +532,87 @@ function Field(location, path::String, name::String, iter; indices = (:, :, :), boundary_conditions = nothing) - file = jldopen(path) - # Default to CPU if neither architecture nor grid is specified - architecture = isnothing(architecture) ? - (isnothing(grid) ? CPU() : Architectures.architecture(grid)) : - architecture - - grid = isnothing(grid) ? - on_architecture(architecture, file["serialized/grid"]) : grid + if isnothing(architecture) + if isnothing(grid) + architecture = CPU() + else + architecture = Architectures.architecture(grid) + end + end + + # Load the grid and data from file + file = jldopen(path) - raw_data = arch_array(architecture, file["timeseries/$name/$iter"]) + isnothing(grid) && (grid = file["serialized/grid"]) + raw_data = file["timeseries/$name/$iter"] close(file) - data = offset_data(raw_data, grid, location, indices) + # Change grid to specified architecture? + grid = on_architecture(architecture, grid) + raw_data = arch_array(architecture, raw_data) + data = offset_data(raw_data, grid, location, indices) return Field(location, grid; boundary_conditions, indices, data) end ##### -##### set! +##### Basic behavior ##### -function set!(fts::FieldTimeSeries, fields_vector::AbstractVector{<:AbstractField}) - raw_data = parent(fts) - file = jldopen(path) - - for (n, field) in enumerate(fields_vector) - raw_data[:, :, :, n] .= parent(field) - end +Base.lastindex(fts::FlavorOfFTS, dim) = lastindex(fts.data, dim) +Base.parent(fts::InMemoryFTS) = parent(fts.data) +Base.parent(fts::OnDiskFTS) = nothing +indices(fts::FieldTimeSeries) = fts.indices +interior(fts::FieldTimeSeries, I...) = view(interior(fts), I...) - close(file) +# Make FieldTimeSeries behave like Vector wrt to singleton indices +Base.length(fts::FlavorOfFTS) = length(fts.times) +Base.lastindex(fts::FlavorOfFTS) = length(fts.times) +Base.firstindex(fts::FlavorOfFTS) = 1 - return nothing -end +Base.length(fts::PartlyInMemoryFTS) = length(fts.backend) function interior(fts::FieldTimeSeries) - loc = instantiate.(location(fts)) - topo = instantiate.(topology(fts.grid)) + loc = map(instantiate, location(fts)) + topo = map(instantiate, topology(fts.grid)) sz = size(fts.grid) halo_sz = halo_size(fts.grid) - i_interior = interior_parent_indices.(loc, topo, sz, halo_sz) - + i_interior = map(interior_parent_indices, loc, topo, sz, halo_sz) indices = fts.indices - i_view = interior_view_indices.(indices, i_interior) + i_view = map(interior_view_indices, indices, i_interior) return view(parent(fts), i_view..., :) end -interior(fts::FieldTimeSeries, I...) = view(interior(fts), I...) -indices(fts::FieldTimeSeries) = fts.indices - -function Statistics.mean(fts::FieldTimeSeries; dims=:) - m = mean(fts[1]; dims) - Nt = length(fts) - - if dims isa Colon - for n = 2:Nt - m += mean(fts[n]) - end - - return m / Nt - else - for n = 2:Nt - m .+= mean(fts[n]; dims) - end - - m ./= Nt - - return m - end -end - -##### -##### Methods -##### - -# Include the time dimension. -@inline Base.size(fts::FieldTimeSeries) = (size(fts.grid, location(fts), fts.indices)..., length(fts.times)) -@propagate_inbounds Base.setindex!(fts::FieldTimeSeries, val, inds...) = Base.setindex!(fts.data, val, inds...) - -##### -##### Basic support for reductions -##### -##### TODO: support for reductions across _time_ (ie when 4 ∈ dims) -##### - -const FTS = FieldTimeSeries - -for reduction in (:sum, :maximum, :minimum, :all, :any, :prod) - reduction! = Symbol(reduction, '!') - - @eval begin - - # Allocating - function Base.$(reduction)(f::Function, fts::FTS; dims=:, kw...) - if dims isa Colon - return Base.$(reduction)($(reduction)(f, fts[n]; kw...) for n in 1:length(fts.times)) - else - T = filltype(Base.$(reduction!), fts) - loc = LX, LY, LZ = reduced_location(location(fts); dims) - times = fts.times - rts = FieldTimeSeries{LX, LY, LZ}(grid, times, T; indices=fts.indices) - return Base.$(reduction!)(f, rts, fts; kw...) - end - end - - Base.$(reduction)(fts::FTS; kw...) = Base.$(reduction)(identity, fts; kw...) - - function Base.$(reduction!)(f::Function,rts::FTS, fts::FTS; dims=:, kw...) - dims isa Tuple && 4 ∈ dims && error("Reduction across the time dimension (dim=4) is not yet supported!") - for n = 1:length(rts) - Base.$(reduction!)(f, rts[i], fts[i]; dims, kw...) - end - return rts - end - - Base.$(reduction!)(rts::FTS, fts::FTS; kw...) = Base.$(reduction!)(identity, rts, fts; kw...) - end -end - -##### -##### Show methods -##### +# FieldTimeSeries boundary conditions +const CPUFTSBC = BoundaryCondition{<:Any, <:FieldTimeSeries} +const GPUFTSBC = BoundaryCondition{<:Any, <:GPUAdaptedFieldTimeSeries} +const FTSBC = Union{CPUFTSBC, GPUFTSBC} -backend_str(::Type{InMemory}) = "InMemory" -backend_str(::Type{OnDisk}) = "OnDisk" +@inline getbc(bc::FTSBC, i::Int, j::Int, grid::AbstractGrid, clock, args...) = bc.condition[i, j, Time(clock.time)] ##### -##### show +##### Fill halo regions ##### -function Base.summary(fts::FieldTimeSeries{LX, LY, LZ, K}) where {LX, LY, LZ, K} - arch = architecture(fts) - B = string(typeof(fts.backend).name.wrapper) - sz_str = string(join(size(fts), "×")) +const MAX_FTS_TUPLE_SIZE = 10 - path = fts.path - name = fts.name - A = typeof(arch) +fill_halo_regions!(fts::OnDiskFTS) = nothing - if isnothing(path) - suffix = " on $A" - else - suffix = " of $name at $path" - end +function fill_halo_regions!(fts::InMemoryFTS) + partitioned_indices = Iterators.partition(time_indices(fts), MAX_FTS_TUPLE_SIZE) + partitioned_indices = collect(partitioned_indices) + Ni = length(partitioned_indices) - return string("$sz_str FieldTimeSeries{$B} located at ", show_location(fts), suffix) -end - -function Base.show(io::IO, fts::FieldTimeSeries) - prefix = string(summary(fts), '\n', - "├── grid: ", summary(fts.grid), '\n', - "├── indices: ", indices_summary(fts), '\n') - - suffix = field_time_series_suffix(fts) - - return print(io, prefix, suffix) -end - -function field_time_series_suffix(fts::InMemoryFieldTimeSeries) - ii = fts.backend.index_range - - if ii isa Colon - backend_str = "├── backend: InMemory(:)" - else - N = length(ii) - if N < 6 - index_range_str = string(ii) - else - index_range_str = string("[", ii[1], - ", ", ii[2], - ", ", ii[3], - " … ", - ii[end-2], ", ", - ii[end-1], ", ", - ii[end], "]") - - end - - backend_str = string("├── backend: InMemory(", index_range_str, ")", '\n') + asyncmap(1:Ni) do i + indices = partitioned_indices[i] + fts_tuple = Tuple(fts[n] for n in indices) + fill_halo_regions!(fts_tuple) end - path_str = isnothing(fts.path) ? "" : string("├── path: ", fts.path, '\n') - name_str = isnothing(fts.name) ? "" : string("├── name: ", fts.name, '\n') - - return string(backend_str, '\n', - path_str, - name_str, - "└── data: ", summary(fts.data), '\n', - " └── ", data_summary(fts.data)) + return nothing end -field_time_series_suffix(fts::OnDiskFieldTimeSeries) = - string("├── backend: ", summary(fts.backend), '\n', - "├── path: ", fts.path, '\n', - "└── name: ", fts.name) diff --git a/src/OutputReaders/field_time_series_indexing.jl b/src/OutputReaders/field_time_series_indexing.jl new file mode 100644 index 0000000000..c50037f49b --- /dev/null +++ b/src/OutputReaders/field_time_series_indexing.jl @@ -0,0 +1,267 @@ +import Oceananigans.Fields: interpolate +using Oceananigans.Fields: interpolator, _interpolate, fractional_indices + +##### +##### Computation of time indices for interpolation +##### + +# Simplest implementation, linear extrapolation if out-of-bounds +@inline interpolating_time_indices(::Linear, times, t) = time_index_binary_search(times, t) + +# Cyclical implementation if out-of-bounds (wrap around the time-series) +@inline function interpolating_time_indices(ti::Cyclical, times, t) + Nt = length(times) + t¹ = first(times) + tᴺ = last(times) + + T = ti.period + Δt = T - (tᴺ - t¹) + + # Compute modulus of shifted time, then add shift back + τ = t - t¹ + mod_τ = mod(τ, T) + mod_t = mod_τ + t¹ + + ñ, n₁, n₂ = time_index_binary_search(times, mod_t) + + cycling = ñ > 1 # we are _between_ tᴺ and t¹ + T + cycled_indices = (ñ - 1, Nt, 1) + uncycled_indices = (ñ, n₁, n₂) + + return ifelse(cycling, cycled_indices, uncycled_indices) +end + +# Clamp mode if out-of-bounds, i.e get the neareast neighbor +@inline function interpolating_time_indices(::Clamp, times, t) + n, n₁, n₂ = time_index_binary_search(times, t) + + beyond_indices = (0, n₂, n₂) # Beyond the last time: return n₂ + before_indices = (0, n₁, n₁) # Before the first time: return n₁ + unclamped_indices = (n, n₁, n₂) # Business as usual + + Nt = length(times) + + indices = ifelse(n + n₁ > Nt, beyond_indices, + ifelse(n + n₁ < 1, before_indices, unclamped_indices)) + + return indices +end + +@inline function time_index_binary_search(times, t) + Nt = length(times) + + # n₁ and n₂ are the index to interpolate inbetween and + # n is a fractional index where 0 ≤ n ≤ 1 + n₁, n₂ = index_binary_search(times, t, Nt) + + @inbounds begin + t₁ = times[n₁] + t₂ = times[n₂] + end + + # "Fractional index" ñ ∈ (0, 1) + ñ = (n₂ - n₁) / (t₂ - t₁) * (t - t₁) + + ñ = ifelse(n₂ == n₁, zero(ñ), ñ) + + return ñ, n₁, n₂ +end + +##### +##### `getindex` and `setindex` with integers `(i, j, n)` +##### + +import Base: getindex + +function getindex(fts::OnDiskFTS, n::Int) + # Load data + arch = architecture(fts) + file = jldopen(fts.path) + iter = keys(file["timeseries/t"])[n] + raw_data = arch_array(arch, file["timeseries/$(fts.name)/$iter"]) + close(file) + + # Wrap Field + loc = location(fts) + field_data = offset_data(raw_data, fts.grid, loc, fts.indices) + + return Field(loc, fts.grid; + indices = fts.indices, + boundary_conditions = fts.boundary_conditions, + data = field_data) +end + +@propagate_inbounds getindex(f::FlavorOfFTS, i, j, k, n::Int) = getindex(f.data, i, j, k, memory_index(f, n)) +@propagate_inbounds setindex!(f::FlavorOfFTS, v, i, j, k, n::Int) = setindex!(f.data, v, i, j, k, memory_index(f, n)) + +# Reduced FTS +const XYFTS = FlavorOfFTS{<:Any, <:Any, Nothing, <:Any, <:Any} +const XZFTS = FlavorOfFTS{<:Any, Nothing, <:Any, <:Any, <:Any} +const YZFTS = FlavorOfFTS{Nothing, <:Any, <:Any, <:Any, <:Any} + +@propagate_inbounds getindex(f::XYFTS, i::Int, j::Int, n::Int) = getindex(f.data, i, j, 1, memory_index(f, n)) +@propagate_inbounds getindex(f::XZFTS, i::Int, k::Int, n::Int) = getindex(f.data, i, 1, k, memory_index(f, n)) +@propagate_inbounds getindex(f::YZFTS, j::Int, k::Int, n::Int) = getindex(f.data, 1, j, k, memory_index(f, n)) + +##### +##### Time interpolation / extrapolation +##### Local getindex with integers `(i, j, k)` and `n :: Time` +##### + +# Valid for all flavors of FTS +@inline getindex(fts::FlavorOfFTS, i::Int, j::Int, k::Int, time_index::Time) = + interpolating_getindex(fts, i, j, k, time_index) + +@inline function interpolating_getindex(fts, i, j, k, time_index) + ñ, n₁, n₂ = interpolating_time_indices(fts.time_indexing, fts.times, time_index.time) + + @inbounds begin + ψ₁ = getindex(fts, i, j, k, n₁) + ψ₂ = getindex(fts, i, j, k, n₂) + end + + ψ̃ = ψ₂ * ñ + ψ₁ * (1 - ñ) + + # Don't interpolate if n₁ == n₂. + return ifelse(n₁ == n₂, ψ₁, ψ̃) +end + +##### +##### Global `getindex` with `time_index :: Time` +##### + +# Linear time interpolation +function Base.getindex(fts::FieldTimeSeries, time_index::Time) + # Calculate fractional index (0 ≤ ñ ≤ 1) + ñ, n₁, n₂ = interpolating_time_indices(fts.time_indexing, fts.times, time_index.time) + + if n₁ == n₂ # no interpolation needed + return fts[n₁] + end + + # Otherwise, make a Field representing a linear interpolation in time + ψ₁ = fts[n₁] + ψ₂ = fts[n₂] + ψ̃ = Field(ψ₂ * ñ + ψ₁ * (1 - ñ)) + + # Compute the field and return it + return compute!(ψ̃) +end + +##### +##### Linear time- and space-interpolation of a FTS +##### + +@inline function interpolate(at_node, at_time_index::Time, from_fts::FlavorOfFTS, from_loc, from_grid) + data = from_fts.data + times = from_fts.times + backend = from_fts.backend + time_indexing = from_fts.time_indexing + return interpolate(at_node, at_time_index, data, from_loc, from_grid, times, backend, time_indexing) +end + +@inline function interpolate(at_node, at_time_index::Time, data::OffsetArray, + from_loc, from_grid, times, backend, time_indexing) + + at_time = at_time_index.time + + # Build space interpolators + ii, jj, kk = fractional_indices(at_node, from_grid, from_loc...) + + ix = interpolator(ii) + iy = interpolator(jj) + iz = interpolator(kk) + + ñ, n₁, n₂ = interpolating_time_indices(time_indexing, times, at_time) + + Nt = length(times) + m₁ = memory_index(backend, time_indexing, Nt, n₁) + m₂ = memory_index(backend, time_indexing, Nt, n₂) + + ψ₁ = _interpolate(data, ix, iy, iz, m₁) + ψ₂ = _interpolate(data, ix, iy, iz, m₂) + ψ̃ = ψ₂ * ñ + ψ₁ * (1 - ñ) + + # Don't interpolate if n₁ == n₂ + return ifelse(n₁ == n₂, ψ₁, ψ̃) +end + +function interpolate!(target_fts::FieldTimeSeries, source_fts::FieldTimeSeries) + + target_grid = target_fts.grid + source_grid = source_fts.grid + + @assert architecture(target_grid) == architecture(source_grid) + arch = architecture(target_grid) + + # Make locations + source_location = map(instantiate, location(source_fts)) + target_location = map(instantiate, location(target_fts)) + + launch!(arch, target_grid, size(target_fts), + _interpolate_field_time_series!, + target_fts.data, target_grid, target_location, + source_fts.data, source_grid, source_location) + + fill_halo_regions!(target_fts) + + return nothing +end + +@kernel function _interpolate_field_time_series!(target_fts, target_grid, target_location, + source_fts, source_grid, source_location) + + # 4D index, cool! + i, j, k, n = @index(Global, NTuple) + + source_field = view(source_fts, :, :, :, n) + target_node = node(i, j, k, target_grid, target_location...) + target_time = @inbounds target_fts.times[n] + + @inbounds target_fts[i, j, k, n] = interpolate(target_node, target_time, + source_fts, source_location, source_grid) +end + +##### +##### FieldTimeSeries updating +##### + +# Fallbacks that do nothing +update_field_time_series!(fts, time::Time) = nothing +update_field_time_series!(fts, n::Int) = nothing + +# Update the `fts` to contain the time `time_index.time`. +# Linear extrapolation, simple version +function update_field_time_series!(fts::PartlyInMemoryFTS, time_index::Time) + t = time_index.time + ñ, n₁, n₂ = interpolating_time_indices(fts.time_indexing, fts.times, t) + return update_field_time_series!(fts, n₁, n₂) +end + +function update_field_time_series!(fts::PartlyInMemoryFTS, n₁::Int, n₂=n₁) + idxs = time_indices(fts) + in_range = n₁ ∈ idxs && n₂ ∈ idxs + + if !in_range + # Update backend + Nm = length(fts.backend) + start = n₁ + fts.backend = new_backend(fts.backend, start, Nm) + set!(fts) + end + + return nothing +end + +# If `n` is not in memory, getindex automatically updates the data in memory +# so that `n` is the first index available. +function getindex(fts::InMemoryFTS, n::Int) + update_field_time_series!(fts, n) + + m = memory_index(fts, n) + underlying_data = view(parent(fts), :, :, :, m) + data = offset_data(underlying_data, fts.grid, location(fts), fts.indices) + + return Field(location(fts), fts.grid; data, fts.boundary_conditions, fts.indices) +end + diff --git a/src/OutputReaders/field_time_series_reductions.jl b/src/OutputReaders/field_time_series_reductions.jl new file mode 100644 index 0000000000..0dba3b60eb --- /dev/null +++ b/src/OutputReaders/field_time_series_reductions.jl @@ -0,0 +1,51 @@ +using Statistics +import Oceananigans.Fields: conditional_length + +@inline conditional_length(fts::FieldTimeSeries) = length(fts) * conditional_length(fts[1]) + +##### +##### Methods +##### + +# Include the time dimension. +@inline Base.size(fts::FieldTimeSeries) = (size(fts.grid, location(fts), fts.indices)..., length(fts.times)) +@propagate_inbounds Base.setindex!(fts::FieldTimeSeries, val, inds...) = Base.setindex!(fts.data, val, inds...) + +##### +##### Basic support for reductions +##### +##### TODO: support for reductions across _time_ (ie when 4 ∈ dims) +##### + +for reduction in (:sum, :maximum, :minimum, :all, :any, :prod) + reduction! = Symbol(reduction, '!') + + @eval begin + + # Allocating + function Base.$(reduction)(f::Function, fts::FTS; dims=:, kw...) + if dims isa Colon + return Base.$(reduction)($(reduction)(f, fts[n]; kw...) for n in 1:length(fts.times)) + else + T = filltype(Base.$(reduction!), fts) + loc = LX, LY, LZ = reduced_location(location(fts); dims) + times = fts.times + rts = FieldTimeSeries{LX, LY, LZ}(grid, times, T; indices=fts.indices) + return Base.$(reduction!)(f, rts, fts; kw...) + end + end + + Base.$(reduction)(fts::FTS; kw...) = Base.$(reduction)(identity, fts; kw...) + + function Base.$(reduction!)(f::Function,rts::FTS, fts::FTS; dims=:, kw...) + dims isa Tuple && 4 ∈ dims && error("Reduction across the time dimension (dim=4) is not yet supported!") + for n = 1:length(rts) + Base.$(reduction!)(f, rts[i], fts[i]; dims, kw...) + end + return rts + end + + Base.$(reduction!)(rts::FTS, fts::FTS; kw...) = Base.$(reduction!)(identity, rts, fts; kw...) + end +end + diff --git a/src/OutputReaders/gpu_adapted_field_time_series.jl b/src/OutputReaders/gpu_adapted_field_time_series.jl deleted file mode 100644 index 23438b8730..0000000000 --- a/src/OutputReaders/gpu_adapted_field_time_series.jl +++ /dev/null @@ -1,53 +0,0 @@ -using Adapt - -struct GPUAdaptedFieldTimeSeries{LX, LY, LZ, T, D, χ} <: AbstractArray{T, 4} - data :: D - times :: χ - - function GPUAdaptedFieldTimeSeries{LX, LY, LZ, T}(data::D, - times::χ) where {T, LX, LY, LZ, D, χ} - return new{LX, LY, LZ, T, D, χ}(data, times) - end -end - -Adapt.adapt_structure(to, fts::FieldTimeSeries{LX, LY, LZ}) where {LX, LY, LZ} = - GPUAdaptedFieldTimeSeries{LX, LY, LZ, eltype(fts.grid)}(adapt(to, fts.data), - adapt(to, fts.times)) - -@propagate_inbounds Base.lastindex(fts::GPUAdaptedFieldTimeSeries) = lastindex(fts.data) -@propagate_inbounds Base.lastindex(fts::GPUAdaptedFieldTimeSeries, dim) = lastindex(fts.data, dim) - -const XYFTS = FieldTimeSeries{<:Any, <:Any, Nothing} -const XZFTS = FieldTimeSeries{<:Any, Nothing, <:Any} -const YZFTS = FieldTimeSeries{Nothing, <:Any, <:Any} - -const XYGPUFTS = GPUAdaptedFieldTimeSeries{<:Any, <:Any, Nothing} -const XZGPUFTS = GPUAdaptedFieldTimeSeries{<:Any, Nothing, <:Any} -const YZGPUFTS = GPUAdaptedFieldTimeSeries{Nothing, <:Any, <:Any} - -# Handle `Nothing` locations to allow `getbc` to work -@propagate_inbounds Base.getindex(fts::XYGPUFTS, i::Int, j::Int, n) = fts[i, j, 1, n] -@propagate_inbounds Base.getindex(fts::XZGPUFTS, i::Int, k::Int, n) = fts[i, 1, k, n] -@propagate_inbounds Base.getindex(fts::YZGPUFTS, j::Int, k::Int, n) = fts[1, j, k, n] - -@propagate_inbounds Base.getindex(fts::XYFTS, i::Int, j::Int, n) = fts[i, j, 1, n] -@propagate_inbounds Base.getindex(fts::XZFTS, i::Int, k::Int, n) = fts[i, 1, k, n] -@propagate_inbounds Base.getindex(fts::YZFTS, j::Int, k::Int, n) = fts[1, j, k, n] - -# Only `getindex` for GPUAdaptedFieldTimeSeries, no need to `setindex` -Base.getindex(fts::GPUAdaptedFieldTimeSeries, i::Int, j::Int, k::Int, n::Int) = fts.data[i, j, k, n] - -# Extend Linear time interpolation for GPUAdaptedFieldTimeSeries -function Base.getindex(fts::GPUAdaptedFieldTimeSeries, i::Int, j::Int, k::Int, time_index::Time) - Ntimes = length(fts.times) - time = time_index.time - n₁, n₂ = index_binary_search(fts.times, time, Ntimes) - - # fractional index - @inbounds n = (n₂ - n₁) / (fts.times[n₂] - fts.times[n₁]) * (time - fts.times[n₁]) + n₁ - fts_interpolated = getindex(fts, i, j, k, n₂) * (n - n₁) + getindex(fts, i, j, k, n₁) * (n₂ - n) - - # Don't interpolate if n = 0. - return ifelse(n₁ == n₂, getindex(fts, i, j, k, n₁), fts_interpolated) -end - diff --git a/src/OutputReaders/memory_allocated_field_time_series.jl b/src/OutputReaders/memory_allocated_field_time_series.jl deleted file mode 100644 index cbc53acdf2..0000000000 --- a/src/OutputReaders/memory_allocated_field_time_series.jl +++ /dev/null @@ -1,98 +0,0 @@ - -function new_data(FT, grid, loc, indices, Nt, backend::InMemory) - space_size = total_size(grid, loc, indices) - Nt = backend.index_range == Colon() ? Nt : length(backend.index_range) - underlying_data = zeros(FT, architecture(grid), space_size..., Nt) - data = offset_data(underlying_data, grid, loc, indices) - return data -end - -@propagate_inbounds Base.getindex(f::InMemoryFieldTimeSeries, i, j, k, n::Int) = - f.data[i, j, k, n - f.backend.index_range[1] + 1] - -@propagate_inbounds Base.getindex(f::TotallyInMemoryFieldTimeSeries, i, j, k, n::Int) = - f.data[i, j, k, n] - -@propagate_inbounds Base.setindex!(f::InMemoryFieldTimeSeries, v, i, j, k, n::Int) = - setindex!(f.data, v, i, j, k, n - f.backend.index_range[1] + 1) - -@propagate_inbounds Base.setindex!(f::TotallyInMemoryFieldTimeSeries, v, i, j, k, n::Int) = - setindex!(f.data, v, i, j, k, n) - -Base.parent(fts::InMemoryFieldTimeSeries) = parent(fts.data) - -compute_time_index(index_range, n) = n - index_range[1] + 1 -compute_time_index(::Colon, n) = n - -# If `n` is not in memory, getindex automatically sets the data in memory to have the `n` -# as the second index (to allow interpolation with the previous time step) -# If n is `1` or within the end the timeseries different rules apply -function Base.getindex(fts::InMemoryFieldTimeSeries, n::Int) - update_field_time_series!(fts, n) - - index_range = fts.backend.index_range - time_index = compute_time_index(index_range, n) - underlying_data = view(parent(fts), :, :, :, time_index) - - data = offset_data(underlying_data, fts.grid, location(fts), fts.indices) - - return Field(location(fts), fts.grid; data, fts.boundary_conditions, fts.indices) -end - -set!(fts::InMemoryFieldTimeSeries, f, index::Int) = set!(fts[index], f) - -iterations_from_file(file, ::Colon) = parse.(Int, keys(file["timeseries/t"])) - -function iterations_from_file(file, index_range::UnitRange) - all_iterations = iterations_from_file(file, Colon()) - return all_iterations[index_range] -end - -time_indices(fts::InMemoryFieldTimeSeries) = time_indices(fts.backend.index_range, fts.times) -time_indices(::Colon, times) = UnitRange(1, length(times)) -time_indices(index_range, times) = index_range - -find_time_index(time::Number, file_times) = findfirst(t -> t ≈ time, file_times) -find_time_index(time::AbstractTime, file_times) = findfirst(t -> t == time, file_times) - -function set!(fts::InMemoryFieldTimeSeries, path::String, name::String) - index_range = fts.backend.index_range - - file = jldopen(path) - file_iterations = iterations_from_file(file, index_range) - file_times = [file["timeseries/t/$i"] for i in file_iterations] - close(file) - - times = fts.times[index_range] - indices = time_indices(fts) - - for (n, time) in zip(indices, times) - file_index = find_time_index(time, file_times) - file_iter = file_iterations[file_index] - - field_n = Field(location(fts), path, name, file_iter, - indices = fts.indices, - boundary_conditions = fts.boundary_conditions, - grid = fts.grid) - - set!(fts[n], field_n) - end - - return nothing -end - -const MAX_FTS_TUPLE_SIZE = 10 - -function fill_halo_regions!(fts::InMemoryFieldTimeSeries) - partitioned_indices = Iterators.partition(time_indices(fts), MAX_FTS_TUPLE_SIZE) |> collect - Ni = length(partitioned_indices) - - asyncmap(1:Ni) do i - indices = partitioned_indices[i] - fts_tuple = Tuple(fts[n] for n in indices) - fill_halo_regions!(fts_tuple) - end - - return nothing -end - diff --git a/src/OutputReaders/on_disk_field_time_series.jl b/src/OutputReaders/on_disk_field_time_series.jl deleted file mode 100644 index fb99375a48..0000000000 --- a/src/OutputReaders/on_disk_field_time_series.jl +++ /dev/null @@ -1,66 +0,0 @@ - -new_data(FT, grid, loc, indices, Nt, ::OnDisk) = nothing - -Base.parent(fts::OnDiskFieldTimeSeries) = nothing -Base.length(fts::OnDiskFieldTimeSeries) = length(fts.times) - -function Base.getindex(fts::FieldTimeSeries{LX, LY, LZ, OnDisk}, n::Int) where {LX, LY, LZ} - # Load data - arch = architecture(fts) - file = jldopen(fts.path) - iter = keys(file["timeseries/t"])[n] - raw_data = arch_array(arch, file["timeseries/$(fts.name)/$iter"]) - close(file) - - # Wrap Field - loc = (LX, LY, LZ) - field_data = offset_data(raw_data, fts.grid, loc, fts.indices) - - return Field(loc, fts.grid; - indices = fts.indices, - boundary_conditions = fts.boundary_conditions, - data = field_data) -end - -##### -##### set! -##### - -# Write property only if it does not already exist -function maybe_write_property!(file, property, data) - try - test = file[property] - catch - file[property] = data - end -end - -""" - set!(fts::OnDiskFieldTimeSeries, field::Field, time_index::Int) - -Write the data in `parent(field)` to the file at `fts.path`, -under `fts.name` and at index `fts.times[time_index]`. -""" -function set!(fts::OnDiskFieldTimeSeries, field::Field, time_index::Int) - fts.grid == field.grid || error("The grids attached to the Field and \ - FieldTimeSeries appear to be different.") - path = fts.path - name = fts.name - jldopen(path, "a+") do file - initialize_file!(file, name, fts) - maybe_write_property!(file, "timeseries/t/$time_index", fts.times[time_index]) - maybe_write_property!(file, "timeseries/$name/$time_index", parent(field)) - end -end - -function initialize_file!(file, name, fts) - maybe_write_property!(file, "serialized/grid", fts.grid) - maybe_write_property!(file, "timeseries/$name/serialized/location", location(fts)) - maybe_write_property!(file, "timeseries/$name/serialized/indices", indices(fts)) - maybe_write_property!(file, "timeseries/$name/serialized/boundary_conditions", boundary_conditions(fts)) - return nothing -end - -set!(fts::OnDiskFieldTimeSeries, path::String, name::String) = nothing -fill_halo_regions!(fts::OnDiskFieldTimeSeries) = nothing - diff --git a/src/OutputReaders/set_field_time_series.jl b/src/OutputReaders/set_field_time_series.jl new file mode 100644 index 0000000000..8e441679a1 --- /dev/null +++ b/src/OutputReaders/set_field_time_series.jl @@ -0,0 +1,91 @@ +##### +##### set! +##### + +iterations_from_file(file) = parse.(Int, keys(file["timeseries/t"])) + +find_time_index(time::Number, file_times) = findfirst(t -> t ≈ time, file_times) +find_time_index(time::AbstractTime, file_times) = findfirst(t -> t == time, file_times) + +function set!(fts::InMemoryFTS, path::String=fts.path, name::String=fts.name) + file = jldopen(path) + file_iterations = iterations_from_file(file) + file_times = [file["timeseries/t/$i"] for i in file_iterations] + close(file) + + # TODO: a potential optimization here might be to load + # all of the data into a single array, and then transfer that + # to parent(fts). + for n in time_indices(fts) + t = fts.times[n] + file_index = find_time_index(t, file_times) + file_iter = file_iterations[file_index] + + # Note: use the CPU for this step + field_n = Field(location(fts), path, name, file_iter, + architecture = CPU(), + indices = fts.indices, + boundary_conditions = fts.boundary_conditions) + + # Potentially transfer from CPU to GPU + set!(fts[n], field_n) + end + + return nothing +end + +set!(fts::InMemoryFTS, value, n::Int) = set!(fts[n], value) + +function set!(fts::InMemoryFTS, fields_vector::AbstractVector{<:AbstractField}) + raw_data = parent(fts) + file = jldopen(path) + + for (n, field) in enumerate(fields_vector) + nth_raw_data = view(raw_data, :, :, :, n) + copyto!(nth_raw_data, parent(field)) + # raw_data[:, :, :, n] .= parent(field) + end + + close(file) + + return nothing +end + +# Write property only if it does not already exist +function maybe_write_property!(file, property, data) + try + test = file[property] + catch + file[property] = data + end +end + +""" + set!(fts::OnDiskFieldTimeSeries, field::Field, n::Int, time=fts.times[time_index]) + +Write the data in `parent(field)` to the file at `fts.path`, +under `fts.name` and at `time_index`. The save field is assigned `time`, +which is extracted from `fts.times[time_index]` if not provided. +""" +function set!(fts::OnDiskFTS, field::Field, n::Int, time=fts.times[n]) + fts.grid == field.grid || error("The grids attached to the Field and \ + FieldTimeSeries appear to be different.") + path = fts.path + name = fts.name + jldopen(path, "a+") do file + initialize_file!(file, name, fts) + maybe_write_property!(file, "timeseries/t/$n", time) + maybe_write_property!(file, "timeseries/$name/$n", Array(parent(field))) + end +end + +function initialize_file!(file, name, fts) + maybe_write_property!(file, "serialized/grid", fts.grid) + maybe_write_property!(file, "timeseries/$name/serialized/location", location(fts)) + maybe_write_property!(file, "timeseries/$name/serialized/indices", indices(fts)) + maybe_write_property!(file, "timeseries/$name/serialized/boundary_conditions", boundary_conditions(fts)) + return nothing +end + +set!(fts::OnDiskFTS, path::String, name::String) = nothing + diff --git a/src/OutputReaders/show_field_time_series.jl b/src/OutputReaders/show_field_time_series.jl new file mode 100644 index 0000000000..8251a0ec97 --- /dev/null +++ b/src/OutputReaders/show_field_time_series.jl @@ -0,0 +1,61 @@ +using Oceananigans.Fields: show_location, data_summary +using Oceananigans.Utils: prettysummary + +##### +##### Show methods +##### + +Base.summary(::Clamp) = "Clamp()" +Base.summary(::Linear) = "Linear()" +Base.summary(ti::Cyclical) = string("Cyclical(period=", prettysummary(ti.period), ")") + +function Base.summary(fts::FieldTimeSeries{LX, LY, LZ, K}) where {LX, LY, LZ, K} + arch = architecture(fts) + B = string(typeof(fts.backend).name.wrapper) + sz_str = string(join(size(fts), "×")) + + path = fts.path + name = fts.name + A = typeof(arch) + + if isnothing(path) + suffix = " on $A" + else + suffix = " of $name at $path" + end + + return string("$sz_str FieldTimeSeries{$B} located at ", show_location(fts), suffix) +end + +function Base.show(io::IO, fts::FieldTimeSeries{LX, LY, LZ, E}) where {LX, LY, LZ, E} + + extrapolation_str = string("├── time boundaries: $(E)") + + prefix = string(summary(fts), '\n', + "├── grid: ", summary(fts.grid), '\n', + "├── indices: ", indices_summary(fts), '\n', + "├── time_indexing: ", summary(fts.time_indexing), '\n') + + suffix = field_time_series_suffix(fts) + + return print(io, prefix, suffix) +end + +function field_time_series_suffix(fts::InMemoryFTS) + backend = fts.backend + backend_str = string("├── backend: ", summary(backend)) + path_str = isnothing(fts.path) ? "" : string("├── path: ", fts.path, '\n') + name_str = isnothing(fts.name) ? "" : string("├── name: ", fts.name, '\n') + + return string(backend_str, '\n', + path_str, + name_str, + "└── data: ", summary(fts.data), '\n', + " └── ", data_summary(parent(fts))) +end + +field_time_series_suffix(fts::OnDiskFTS) = + string("├── backend: ", summary(fts.backend), '\n', + "├── path: ", fts.path, '\n', + "└── name: ", fts.name) + diff --git a/src/OutputReaders/update_field_time_series.jl b/src/OutputReaders/update_field_time_series.jl deleted file mode 100644 index b9f86a748e..0000000000 --- a/src/OutputReaders/update_field_time_series.jl +++ /dev/null @@ -1,95 +0,0 @@ -import Oceananigans.BoundaryConditions: BoundaryCondition, getbc -using Oceananigans.AbstractOperations: AbstractOperation -using Oceananigans.Fields: AbstractField - -# Termination (move all here when we switch the code up) -extract_field_timeseries(f::FieldTimeSeries) = f - -const CPUFTSBC = BoundaryCondition{<:Any, <:FieldTimeSeries} -const GPUFTSBC = BoundaryCondition{<:Any, <:GPUAdaptedFieldTimeSeries} - -const FTSBC = Union{CPUFTSBC, GPUFTSBC} - -@inline getbc(bc::FTSBC, i::Int, j::Int, grid::AbstractGrid, clock, args...) = bc.condition[i, j, Time(clock.time)] - -set!(::TotallyInMemoryFieldTimeSeries, index_range) = nothing - -# Set a field with a range of time indices. -# We change the index range of the `FieldTimeSeries` -# and load the new data -function set!(fts::InMemoryFieldTimeSeries, index_range::UnitRange) - # TODO: only load new data by comparing current and new index range? - fts.backend.index_range = index_range - set!(fts, fts.path, fts.name) - return nothing -end - -# fallback -update_field_time_series!(::Nothing, time) = nothing -update_field_time_series!(::TotallyInMemoryFieldTimeSeries, ::Int64) = nothing -update_field_time_series!(::TotallyInMemoryFieldTimeSeries, ::Time) = nothing - -# Update the `fts` to contain the time `time_index.time`. -function update_field_time_series!(fts::InMemoryFieldTimeSeries, time_index::Time) - time = time_index.time - n₁, n₂ = index_binary_search(fts.times, time, length(fts.times)) - update_field_time_series!(fts, n₂) - return nothing -end - -# Update `fts` to contain the time index `n`. -# update rules are the following: -# if `n` is 1, load the first `length(fts.backend.index_range)` time steps -# if `n` is within the last `length(fts.backend.index_range)` time steps, load the last `length(fts.backend.index_range)` time steps -# otherwise `n` will be placed at index `[:, :, :, 2]` of `fts.data` -function update_field_time_series!(fts::InMemoryFieldTimeSeries, n::Int) - if !(n ∈ fts.backend.index_range) - Nt = length(fts.times) - Ni = length(fts.backend.index_range) - if n == 1 - set!(fts, 1:Ni) - elseif n > Nt - Ni - set!(fts, Nt-Ni+1:Nt) - else - set!(fts, n-1:n+Ni-2) - end - end - - return nothing -end - -#= -# Utility used to extract field time series from a type through recursion -function extract_field_timeseries(t) - names = propertynames(t) - - #= - if isempty(names) - return nothing - end - =# - - Np = length(names) - return ntuple(Val(Np)) do p - Base.@_inline_meta - name = names[p] - prop = getproperty(t, name) - extract_field_timeseries(prop) - end -end - -# For types that do not contain `FieldTimeSeries`, halt the recursion -NonFTS = [:Number, :AbstractArray] - -for NonFTSType in NonFTS - @eval extract_field_timeseries(::$NonFTSType) = nothing -end - -# Special recursion rules for `Tuple` and `Field` types -extract_field_timeseries(t::AbstractField) = Tuple(extract_field_timeseries(getproperty(t, p)) for p in propertynames(t)) -extract_field_timeseries(t::AbstractOperation) = Tuple(extract_field_timeseries(getproperty(t, p)) for p in propertynames(t)) -extract_field_timeseries(t::Tuple) = Tuple(extract_field_timeseries(n) for n in t) -extract_field_timeseries(t::NamedTuple) = Tuple(extract_field_timeseries(n) for n in t) -=# - -extract_field_timeseries(t) = tuple(nothing) diff --git a/src/OutputWriters/OutputWriters.jl b/src/OutputWriters/OutputWriters.jl index c8e06b40ee..1d5de0f6b5 100644 --- a/src/OutputWriters/OutputWriters.jl +++ b/src/OutputWriters/OutputWriters.jl @@ -1,7 +1,7 @@ module OutputWriters export - JLD2OutputWriter, NetCDFOutputWriter, + JLD2OutputWriter, NetCDFOutputWriter, written_names, Checkpointer, WindowedTimeAverage, TimeInterval, IterationInterval, WallTimeInterval, AveragedTimeInterval @@ -31,4 +31,14 @@ include("jld2_output_writer.jl") include("netcdf_output_writer.jl") include("checkpointer.jl") +function written_names(filename) + field_names = String[] + jldopen(filename, "r") do file + all_names = keys(file["timeseries"]) + field_names = filter(n -> n != "t", all_names) + end + return field_names +end + end # module + diff --git a/src/TurbulenceClosures/turbulence_closure_implementations/CATKEVerticalDiffusivities/CATKEVerticalDiffusivities.jl b/src/TurbulenceClosures/turbulence_closure_implementations/CATKEVerticalDiffusivities/CATKEVerticalDiffusivities.jl index 71d27e6719..819b4e458d 100644 --- a/src/TurbulenceClosures/turbulence_closure_implementations/CATKEVerticalDiffusivities/CATKEVerticalDiffusivities.jl +++ b/src/TurbulenceClosures/turbulence_closure_implementations/CATKEVerticalDiffusivities/CATKEVerticalDiffusivities.jl @@ -304,23 +304,56 @@ end # "Patankar trick" for buoyancy production (cf Patankar 1980 or Burchard et al. 2003) # If buoyancy flux is a _sink_ of TKE, we treat it implicitly. - wb = explicit_buoyancy_flux(i, j, k, grid, closure, velocities, tracers, buoyancy, diffusivities) + wb = explicit_buoyancy_flux(i, j, k, grid, closure_ij, velocities, tracers, buoyancy, diffusivities) eⁱʲᵏ = @inbounds tracers.e[i, j, k] + eᵐⁱⁿ = closure_ij.minimum_turbulent_kinetic_energy # See `buoyancy_flux` - dissipative_buoyancy_flux = sign(wb) * sign(eⁱʲᵏ) < 0 + dissipative_buoyancy_flux = (sign(wb) * sign(eⁱʲᵏ) < 0) & (eⁱʲᵏ > eᵐⁱⁿ) wb_e = ifelse(dissipative_buoyancy_flux, wb / eⁱʲᵏ, zero(grid)) - # Implicit TKE flux at solid bottoms (extra damping for TKE near boundaries) + # Treat the divergence of TKE flux at solid bottoms implicitly. + # This will damp TKE near boundaries. The bottom-localized TKE flux may be written + # + # ∂t e = - δ(z + h) ∇ ⋅ Jᵉ + ⋯ + # ∂t e = + δ(z + h) Jᵉ / Δz + ⋯ + # + # where δ(z + h) is a δ-function that is 0 everywhere except adjacent to the bottom boundary + # at $z = -h$ and Δz is the grid spacing at the bottom + # + # Thus if + # + # Jᵉ ≡ - Cᵂϵ * √e³ + # = - (Cᵂϵ * √e) e + # + # Then the contribution of Jᵉ to the implicit flux is + # + # Lᵂ = - Cᵂϵ * √e / Δz. + # on_bottom = !inactive_cell(i, j, k, grid) & inactive_cell(i, j, k-1, grid) Δz = Δzᶜᶜᶜ(i, j, k, grid) Cᵂϵ = closure_ij.turbulent_kinetic_energy_equation.Cᵂϵ - Q_e = - Cᵂϵ * turbulent_velocityᶜᶜᶜ(i, j, k, grid, closure_ij, tracers.e) / Δz * on_bottom + e⁺ = clip(eⁱʲᵏ) + w★ = sqrt(e⁺) + div_Jᵉ_e = - on_bottom * Cᵂϵ * w★ / Δz # Implicit TKE dissipation - ω_e = dissipation_rate(i, j, k, grid, closure_ij, velocities, tracers, buoyancy, diffusivities) + ω = dissipation_rate(i, j, k, grid, closure_ij, velocities, tracers, buoyancy, diffusivities) - diffusivities.Lᵉ[i, j, k] = - wb_e - ω_e + Q_e + # The interior contributions to the linear implicit term `L` are defined via + # + # ∂t e = Lⁱ e + ⋯, + # + # So + # + # Lⁱ e = wb - ϵ + # = (wb / e - ω) e, + # ↖--------↗ + # = Lⁱ + # + # where ω = ϵ / e ∼ √e / ℓ. + + diffusivities.Lᵉ[i, j, k] = - wb_e - ω + div_Jᵉ_e end end diff --git a/src/TurbulenceClosures/turbulence_closure_implementations/ri_based_vertical_diffusivity.jl b/src/TurbulenceClosures/turbulence_closure_implementations/ri_based_vertical_diffusivity.jl index 16dc985252..b65de06ff4 100644 --- a/src/TurbulenceClosures/turbulence_closure_implementations/ri_based_vertical_diffusivity.jl +++ b/src/TurbulenceClosures/turbulence_closure_implementations/ri_based_vertical_diffusivity.jl @@ -14,6 +14,8 @@ struct RiBasedVerticalDiffusivity{TD, FT, R} <: AbstractScalarDiffusivity{TD, Ve Ri₀ :: FT Riᵟ :: FT Ri_dependent_tapering :: R + maximum_diffusivity :: FT + maximum_viscosity :: FT end function RiBasedVerticalDiffusivity{TD}(ν₀::FT, @@ -23,10 +25,15 @@ function RiBasedVerticalDiffusivity{TD}(ν₀::FT, Cᵃᵛ::FT, Ri₀::FT, Riᵟ::FT, - Ri_dependent_tapering::R) where {TD, FT, R} + Ri_dependent_tapering::R, + maximum_diffusivity::FT, + maximum_viscosity::FT) where {TD, FT, R} + return RiBasedVerticalDiffusivity{TD, FT, R}(ν₀, κ₀, κᶜᵃ, Cᵉⁿ, Cᵃᵛ, Ri₀, Riᵟ, - Ri_dependent_tapering) + Ri_dependent_tapering, + maximum_diffusivity, + maximum_viscosity) end # Ri-dependent tapering flavor @@ -86,10 +93,17 @@ Keyword arguments * `Ri₀`: ``Ri`` threshold for decreasing viscosity and diffusivity. * `Riᵟ`: ``Ri``-width over which viscosity and diffusivity decreases to 0. + +* `maximum_diffusivity`: A cap on the diffusivity + +* `maximum_viscosity`: A cap on viscosity + """ function RiBasedVerticalDiffusivity(time_discretization = VerticallyImplicitTimeDiscretization(), FT = Float64; Ri_dependent_tapering = HyperbolicTangentRiDependentTapering(), + maximum_diffusivity = Inf, + maximum_viscosity = Inf, ν₀ = 0.7, κ₀ = 0.5, κᶜᵃ = 1.7, @@ -108,9 +122,16 @@ function RiBasedVerticalDiffusivity(time_discretization = VerticallyImplicitTime TD = typeof(time_discretization) - return RiBasedVerticalDiffusivity{TD}(FT(ν₀), FT(κ₀), FT(κᶜᵃ), FT(Cᵉⁿ), - FT(Cᵃᵛ), FT(Ri₀), FT(Riᵟ), - Ri_dependent_tapering) + return RiBasedVerticalDiffusivity{TD}(convert(FT, ν₀), + convert(FT, κ₀), + convert(FT, κᶜᵃ), + convert(FT, Cᵉⁿ), + convert(FT, Cᵃᵛ), + convert(FT, Ri₀), + convert(FT, Riᵟ), + Ri_dependent_tapering, + convert(FT, maximum_diffusivity), + convert(FT, maximum_viscosity)) end RiBasedVerticalDiffusivity(FT::DataType; kw...) = @@ -244,8 +265,9 @@ end N²_above = ∂z_b(i, j, k+1, grid, buoyancy, tracers) # Conditions + # TODO: apply a minimum entrainment buoyancy gradient? convecting = N² < 0 # applies regardless of Qᵇ - entraining = (N²_above < 0) & (!convecting) & (Qᵇ > 0) + entraining = (N² > 0) & (N²_above < 0) & (Qᵇ > 0) # Convective adjustment diffusivity κᶜᵃ = ifelse(convecting, κᶜᵃ, zero(grid)) @@ -267,6 +289,10 @@ end κᶜ⁺ = κᶜᵃ + κᵉⁿ + κᶜ★ κᵘ⁺ = κᵘ★ + # Limit by specified maximum + κᶜ⁺ = max(κᶜ⁺, closure_ij.maximum_diffusivity) + κᵘ⁺ = max(κᵘ⁺, closure_ij.maximum_viscosity) + # Set to zero on periphery and NaN within inactive region on_periphery = peripheral_node(i, j, k, grid, c, c, f) within_inactive = inactive_node(i, j, k, grid, c, c, f) @@ -276,6 +302,7 @@ end # Update by averaging in time @inbounds κᶜ[i, j, k] = (Cᵃᵛ * κᶜ[i, j, k] + κᶜ⁺) / (1 + Cᵃᵛ) @inbounds κᵘ[i, j, k] = (Cᵃᵛ * κᵘ[i, j, k] + κᵘ⁺) / (1 + Cᵃᵛ) + return nothing end @@ -293,6 +320,7 @@ function Base.show(io::IO, closure::RiBasedVerticalDiffusivity) print(io, "├── Cᵉⁿ: ", prettysummary(closure.Cᵉⁿ), '\n') print(io, "├── Cᵃᵛ: ", prettysummary(closure.Cᵃᵛ), '\n') print(io, "├── Ri₀: ", prettysummary(closure.Ri₀), '\n') - print(io, "└── Riᵟ: ", prettysummary(closure.Riᵟ)) + print(io, "├── maximum_diffusivity: ", prettysummary(closure.maximum_diffusivity), '\n') + print(io, "└── maximum_viscosity: ", prettysummary(closure.maximum_viscosity)) end diff --git a/test/test_output_readers.jl b/test/test_output_readers.jl index fcb7c0fd93..15e5cb982c 100644 --- a/test/test_output_readers.jl +++ b/test/test_output_readers.jl @@ -2,6 +2,7 @@ include("dependencies_for_runtests.jl") using Oceananigans.Utils: Time using Oceananigans.Fields: indices +using Oceananigans.OutputReaders: Cyclical, Clamp function generate_some_interesting_simulation_data(Nx, Ny, Nz; architecture=CPU()) grid = RectilinearGrid(architecture, size=(Nx, Ny, Nz), extent=(64, 64, 32)) @@ -261,11 +262,51 @@ end @testset "Test chunked abstraction" begin @info " Testing Chunked abstraction..." filepath = "testfile.jld2" - f = FieldTimeSeries(filepath, "c") - f_chunked = FieldTimeSeries(filepath, "c"; backend = InMemory(; chunk_size = 2)) + fts = FieldTimeSeries(filepath, "c") + fts_chunked = FieldTimeSeries(filepath, "c"; backend = InMemory(2), time_indexing = Cyclical()) - for t in eachindex(f.times) - f_chunked[t] == f[t] + for t in eachindex(fts.times) + fts_chunked[t] == fts[t] + end + + min_fts, max_fts = extrema(fts) + + # Test cyclic time interpolation with update_field_time_series! + times = map(Time, 0:0.1:300) + for time in times + @test minimum(fts_chunked[time]) ≥ min_fts + @test maximum(fts_chunked[time]) ≤ max_fts + end + end + + @testset "Time Interpolation" begin + times = rand(100) * 100 + times = sort(times) # random times between 0 and 100 + + min_t, max_t = extrema(times) + + grid = RectilinearGrid(size = (1, 1, 1), extent = (1, 1, 1)) + + fts_cyclic = FieldTimeSeries{Nothing, Nothing, Nothing}(grid, times; time_indexing = Cyclical()) + fts_clamp = FieldTimeSeries{Nothing, Nothing, Nothing}(grid, times; time_indexing = Clamp()) + + for t in eachindex(times) + fill!(fts_cyclic[t], t / 2) # value of the field between 0.5 and 50 + fill!(fts_clamp[t], t / 2) # value of the field between 0.5 and 50 + end + + # Let's test that the field remains bounded between 0.5 and 50 + for time in Time.(collect(0:0.1:100)) + @test fts_cyclic[1, 1, 1, time] ≤ 50 + @test fts_cyclic[1, 1, 1, time] ≥ 0.5 + + if time.time > max_t + @test fts_clamp[1, 1, 1, time] == 50 + elseif time.time < min_t + @test fts_clamp[1, 1, 1, time] == 0.5 + else + @test fts_clamp[1, 1, 1, time] ≈ fts_cyclic[1, 1, 1, time] + end end end