Skip to content

Commit

Permalink
Merge pull request #180 from lzaslav/multifile_log_reader
Browse files Browse the repository at this point in the history
Correct usage of enumerators and yield statements
  • Loading branch information
Stuart Rhea authored Jun 22, 2022
2 parents 4667b9a + 81a1a29 commit e320f1a
Showing 1 changed file with 66 additions and 44 deletions.
110 changes: 66 additions & 44 deletions ISOv4Plugin/Mappers/MultiFileTimeLogMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,75 +45,97 @@ public override IEnumerable<OperationData> ImportTimeLogs(ISOTask loggedTask, IE
protected override IEnumerable<ISOSpatialRow> ReadTimeLog(ISOTimeLog _timeLog, string _dataPath)
{
List<BinaryReaderHelper> readers = new List<BinaryReaderHelper>();

try
{
// Obtain binary readers for each time log
foreach (var timeLog in _timeLogs)
readers = CreateBinaryReaders();

// Below alogrithm is using queues for each binary file and matching records on TimeStart/Position.
// At start of each iteration a single record is read from binary file into queue.
// Records with earliest TimeStart are merged together and removed from each file queue.
while (true)
{
var reader = base.ReadTimeLog(timeLog, TaskDataPath);
if (reader != null)
// Read next record from each time log
var readersWithData = ReadNextRecords(readers);

if (readersWithData.Count == 0)
{
readers.Add(new BinaryReaderHelper
{
Enumerator = reader.GetEnumerator()
});
// No more records in each file. Stop processing.
break;
}
}

return ReadFromBinaryReaders(readers);
// Group records by TimeStart and East/North position, and then grab ones with earliest TimeStart.
// This leads to processing earliest records from any file first and keeping other records untouched.
// They will be processed in the next loop iteration along with any records read from already processed files.
var candidates = readersWithData.GroupBy(x => new { x.CurrentRecord.TimeStart, x.CurrentRecord.EastPosition, x.CurrentRecord.NorthPosition })
.OrderBy(x => x.Key.TimeStart)
.First().ToList();

// Merge data from all candidates
ISOSpatialRow result = MergeRecords(candidates);

yield return result;
}
}
finally
{
// Clean up readers
foreach (var reader in readers)
{
reader.Enumerator?.Dispose();
}
DisposeBinaryReaders(readers);
}
}

private IEnumerable<ISOSpatialRow> ReadFromBinaryReaders(List<BinaryReaderHelper> readers)
private List<BinaryReaderHelper> CreateBinaryReaders()
{
// Below alogrithm is using queues for each binary file and matching records on TimeStart/Position.
// At start of each iteration a single record is read from binary file into queue.
// Records with earliest TimeStart are merged together and removed from each file queue.
while (true)
List<BinaryReaderHelper> readers = new List<BinaryReaderHelper>();
// Obtain binary readers for each time log
foreach (var timeLog in _timeLogs)
{
// Read next record from each time log
foreach (var reader in readers)
var reader = base.ReadTimeLog(timeLog, TaskDataPath);
if (reader != null)
{
if (reader.CurrentRecord == null)
readers.Add(new BinaryReaderHelper
{
reader.CurrentRecord = reader.Enumerator.MoveNext() ? reader.Enumerator.Current : null;
}
Enumerator = reader.GetEnumerator()
});
}
}

// Only get readers which still have records;
var readersWithData = readers.Where(x => x.CurrentRecord != null).ToList();
if (readersWithData.Count == 0)
{
// No more records in each file. Stop processing.
break;
}
return readers;
}

// Group records by TimeStart and East/North position, and then grab ones with earliest TimeStart.
// This leads to processing earliest records from any file first and keeping other records untouched.
// They will be processed in the next loop iteration along with any records read from already processed files.
var candidates = readersWithData.GroupBy(x => new { x.CurrentRecord.TimeStart, x.CurrentRecord.EastPosition, x.CurrentRecord.NorthPosition })
.OrderBy(x => x.Key.TimeStart)
.First().ToList();
private void DisposeBinaryReaders(List<BinaryReaderHelper> readers)
{
foreach (var reader in readers)
{
reader.Enumerator?.Dispose();
}
}

// Merge data from all candidates into first record
ISOSpatialRow result = null;
foreach (var candidate in candidates)
private List<BinaryReaderHelper> ReadNextRecords(List<BinaryReaderHelper> readers)
{
foreach (var reader in readers)
{
if (reader.CurrentRecord == null)
{
result = result == null ? candidate.CurrentRecord : result.Merge(candidate.CurrentRecord);
// Clear current record to force reading next one
candidate.CurrentRecord = null;
reader.CurrentRecord = reader.Enumerator.MoveNext() ? reader.Enumerator.Current : null;
}
}

// Only return readers which still have records
return readers.Where(x => x.CurrentRecord != null).ToList();
}

yield return result;
private ISOSpatialRow MergeRecords(List<BinaryReaderHelper> candidates)
{
// Merge data from all candidates into first record
ISOSpatialRow result = null;
foreach (var candidate in candidates)
{
result = result == null ? candidate.CurrentRecord : result.Merge(candidate.CurrentRecord);
// Clear current record to force reading next one
candidate.CurrentRecord = null;
}
return result;
}

protected override ISOTime GetTimeElementFromTimeLog(ISOTimeLog isoTimeLog)
Expand Down

0 comments on commit e320f1a

Please sign in to comment.