Skip to content

Commit

Permalink
added stream correctness test and force read/write option
Browse files Browse the repository at this point in the history
   - also fixed one dimension read bug
  • Loading branch information
philipwjones committed Oct 3, 2024
1 parent 1b10bcd commit 8d8afce
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 25 deletions.
51 changes: 30 additions & 21 deletions components/omega/src/infra/IOStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ int IOStream::finalize(

std::string StreamName = Iter->first;
std::shared_ptr<IOStream> ThisStream = Iter->second;
bool ForceWrite = false;

int Err1 = 0;
if (ThisStream->OnShutdown)
Err1 = ThisStream->writeStream(ModelClock, FinalCall);
Err1 = ThisStream->writeStream(ModelClock, ForceWrite, FinalCall);

if (Err1 != 0) {
LOG_ERROR("Error trying to write stream {} at shutdown", StreamName);
Expand Down Expand Up @@ -236,7 +237,8 @@ bool IOStream::validateAll() {
int IOStream::read(
const std::string &StreamName, // [in] Name of stream
const Clock &ModelClock, // [in] Model clock for time info
Metadata &ReqMetadata // [inout] global metadata requested from file
Metadata &ReqMetadata, // [inout] global metadata requested from file
bool ForceRead // [in] optional: read even if not time
) {
int Err = 0; // default return code

Expand All @@ -245,7 +247,7 @@ int IOStream::read(
if (StreamItr != AllStreams.end()) {
// Stream found, call the read function
std::shared_ptr<IOStream> ThisStream = StreamItr->second;
Err = ThisStream->readStream(ModelClock, ReqMetadata);
Err = ThisStream->readStream(ModelClock, ReqMetadata, ForceRead);
} else { // Stream not found, return error
LOG_ERROR("Unable to read stream {}. Stream not defined", StreamName);
Err = 1;
Expand All @@ -259,7 +261,8 @@ int IOStream::read(
// Writes a single stream if it is time. Returns an error code.
int IOStream::write(
const std::string &StreamName, // [in] Name of stream
const Clock &ModelClock // [in] Model clock needed for time stamps
const Clock &ModelClock, // [in] Model clock needed for time stamps
bool ForceWrite // [in] optional: write even if not time
) {
int Err = 0; // default return code

Expand All @@ -268,7 +271,7 @@ int IOStream::write(
if (StreamItr != AllStreams.end()) {
// Stream found, call the write function
std::shared_ptr<IOStream> ThisStream = StreamItr->second;
Err = ThisStream->writeStream(ModelClock);
Err = ThisStream->writeStream(ModelClock, ForceWrite);
} else {
// Stream not found, return error
LOG_ERROR("Unable to write stream {}. Stream not defined", StreamName);
Expand Down Expand Up @@ -650,10 +653,10 @@ int IOStream::defineAllDims(
// If dimension not found, only generate a warning since there
// may be some dimensions that are not required
I4 InLength;
Err = IO::getDimFromFile(FileID, DimName, DimID, Length);
Err = IO::getDimFromFile(FileID, DimName, DimID, InLength);
if (Err != 0) { // can't find dim in file
// Try again using old name for back compatibility to MPAS
Err = IO::getDimFromFile(FileID, OldDimName, DimID, Length);
Err = IO::getDimFromFile(FileID, OldDimName, DimID, InLength);
if (Err == 0) {
LOG_INFO("Ignore PIO Error for Dimension {}: ", DimName);
LOG_INFO("Found under old dimension name {}: ", OldDimName);
Expand Down Expand Up @@ -2200,7 +2203,8 @@ int IOStream::readFieldData(
// read function used by the public read interface.
int IOStream::readStream(
const Clock &ModelClock, // [in] model clock for getting time
Metadata &ReqMetadata // [inout] global metadata to extract from file
Metadata &ReqMetadata, // [inout] global metadata to extract from file
bool ForceRead // [in] optional: read even if not time
) {
int Err = 0; // default return code

Expand All @@ -2212,13 +2216,15 @@ int IOStream::readStream(
}

// If it is not time to read, return
if (!MyAlarm.isRinging() and !OnStartup)
return Err;
if (UseStartEnd) { // If time outside interval, return
if (!StartAlarm.isRinging())
return Err;
if (EndAlarm.isRinging())
if (!ForceRead) {
if (!MyAlarm.isRinging() and !OnStartup)
return Err;
if (UseStartEnd) { // If time outside interval, return
if (!StartAlarm.isRinging())
return Err;
if (EndAlarm.isRinging())
return Err;
}
}

// Get current simulation time and time string
Expand Down Expand Up @@ -2345,6 +2351,7 @@ int IOStream::readStream(
// public write interfaces.
int IOStream::writeStream(
const Clock &ModelClock, // [in] Model clock needed for time stamps
bool ForceWrite, // [in] Optional: write even if not time
bool FinalCall // [in] Optional flag if called from finalize
) {

Expand All @@ -2358,14 +2365,16 @@ int IOStream::writeStream(
}

// If it is not time to write, return
bool StartupShutdown = OnStartup or (OnShutdown and FinalCall);
if (!MyAlarm.isRinging() and !StartupShutdown)
return Err;
if (UseStartEnd) { // If time outside interval, return
if (!StartAlarm.isRinging())
return Err;
if (EndAlarm.isRinging())
if (!ForceWrite) {
bool StartupShutdown = OnStartup or (OnShutdown and FinalCall);
if (!MyAlarm.isRinging() and !StartupShutdown)
return Err;
if (UseStartEnd) { // If time outside interval, return
if (!StartAlarm.isRinging())
return Err;
if (EndAlarm.isRinging())
return Err;
}
}

// Get current simulation time and time string
Expand Down
10 changes: 7 additions & 3 deletions components/omega/src/infra/IOStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ class IOStream {
/// public read method
int readStream(
const Clock &ModelClock, ///< [in] Model clock for alarms, time stamp
Metadata &ReqMetadata ///< [inout] global metadata to extract from file
Metadata &ReqMetadata, ///< [inout] global metadata to extract from file
bool ForceRead = false ///< [in] Optional: read even if not time
);

/// Private function that performs most of the stream write - called by the
/// public write method
int writeStream(
const Clock &ModelClock, ///< [in] Model clock for alarms, time stamp
bool ForceWrite = false, ///< [in] Optional: write even if not time
bool FinalCall = false ///< [in] Optional flag for shutdown
);

Expand Down Expand Up @@ -221,14 +223,16 @@ class IOStream {
/// Reads a stream if it is time. Returns an error code.
static int read(const std::string &StreamName, ///< [in] Name of stream
const Clock &ModelClock, ///< [in] Model clock for time info
Metadata &ReqMetadata ///< [inout] Metadata desired from file
Metadata &ReqMetadata, ///< [inout] Metadata desired in file
bool ForceRead = false ///< [in] opt: read even if not time
);

//---------------------------------------------------------------------------
/// Writes a stream if it is time. Returns an error code.
static int
write(const std::string &StreamName, ///< [in] Name of stream
const Clock &ModelClock ///< [in] Model clock for time stamps
const Clock &ModelClock, ///< [in] Model clock for time stamps
bool ForceWrite = false ///< [in] opt: write even if not time
);

//---------------------------------------------------------------------------
Expand Down
31 changes: 30 additions & 1 deletion components/omega/test/infra/IOStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,18 @@ int main(int argc, char **argv) {
Err1 = initIOStreamTest(ModelClock, CalGreg);
TestEval("Initialize IOStream test", Err1, ErrRef, Err);

// Retrieve dimension lengths
// Retrieve dimension lengths and some mesh info
I4 NCellsSize = Dimension::getDimLengthLocal("NCells");
I4 NVertLevels = Dimension::getDimLengthLocal("NVertLevels");
Decomp *DefDecomp = Decomp::getDefault();
I4 NCellsOwned = DefDecomp->NCellsOwned;
Array1DI4 CellID = DefDecomp->CellID;

// Create data arrays

Array2DR8 Temp("Temp", NCellsSize, NVertLevels);
Array2DR8 Salt("Salt", NCellsSize, NVertLevels);
Array2DR8 Test("Test", NCellsSize, NVertLevels);

// Attach data arrays to fields

Expand All @@ -222,12 +226,20 @@ int main(int argc, char **argv) {
Err1 = IOStream::read("InitialState", *ModelClock, ReqMetadata);
TestEval("Read restart file", Err1, ErrRef, Err);

// Overwrite salinity array with values associated with global cell
// ID to test proper indexing of IO
parallelFor( {NCellsSize, NVertLevels}, KOKKOS_LAMBDA(int Cell, int K) {
Salt(Cell, K) = 0.0001_Real*(CellID(Cell) + K);
Test(Cell, K) = Salt(Cell, K);
});

// Create a stop alarm at 1 year for time stepping
TimeInstant StopTime(&CalGreg, 0002, 1, 1, 0, 0, 0.0);
Alarm StopAlarm("Stop Time", StopTime);
Err1 = ModelClock->attachAlarm(&StopAlarm);
TestEval("Attach stop alarm", Err1, ErrRef, Err);

// Overwrite
// Step forward in time and write files if it is time
while (!StopAlarm.isRinging()) {
ModelClock->advance();
Expand All @@ -239,6 +251,23 @@ int main(int argc, char **argv) {
TestEval("Write all streams " + CurTimeStr, Err1, ErrRef, Err);
}

// Force read the latest restart and check the results
bool ForceRead = true;
Err1 = IOStream::read("RestartRead", *ModelClock, ReqMetadata, ForceRead);
TestEval("Restart force read", Err1, ErrRef, Err);

Err1 = 0;
auto DataReducer = Kokkos::Sum<I4>(Err1);

parallelReduce(
{NCellsOwned, NVertLevels},
KOKKOS_LAMBDA(int Cell, int K, I4 &Err1) {
if (Salt(Cell,K) != Test(Cell,K))
++Err1;
},
DataReducer);
TestEval("Check Salt array ", Err1, ErrRef, Err);

// Write final output and remove all streams
IOStream::finalize(*ModelClock);
}
Expand Down

0 comments on commit 8d8afce

Please sign in to comment.