From f1a82828fed8950f9a3f1586c44327027627a0c8 Mon Sep 17 00:00:00 2001 From: Peter Johnson Date: Fri, 3 Nov 2023 20:34:43 -0700 Subject: [PATCH] [wpiutil] Add DataLog and DataLogManager Stop() (#5860) Restarting a stopped log results in creating a new log file with fresh copies of the same start records and schema data records. Also check to see if the file has been deleted or if the log file exceeds 1.8 GB, and start a new one. --- .../src/main/native/cpp/DataLogManager.cpp | 23 +- .../src/main/native/include/DataLogManager.h | 10 + ntcoreffi/src/main/native/symbols.txt | 2 + .../src/main/native/cpp/DataLogManager.cpp | 19 +- .../main/native/include/frc/DataLogManager.h | 5 + .../edu/wpi/first/wpilibj/DataLogManager.java | 69 ++-- .../edu/wpi/first/util/datalog/DataLog.java | 11 +- .../wpi/first/util/datalog/DataLogJNI.java | 2 + wpiutil/src/main/native/cpp/DataLog.cpp | 325 +++++++++++++----- .../src/main/native/cpp/jni/DataLogJNI.cpp | 16 + wpiutil/src/main/native/include/wpi/DataLog.h | 43 ++- 11 files changed, 394 insertions(+), 131 deletions(-) diff --git a/ntcoreffi/src/main/native/cpp/DataLogManager.cpp b/ntcoreffi/src/main/native/cpp/DataLogManager.cpp index 023facd628e..e1e49b747b7 100644 --- a/ntcoreffi/src/main/native/cpp/DataLogManager.cpp +++ b/ntcoreffi/src/main/native/cpp/DataLogManager.cpp @@ -264,12 +264,18 @@ Thread::~Thread() { void Thread::Main() { // based on free disk space, scan for "old" FRC_*.wpilog files and remove { - uintmax_t freeSpace = fs::space(m_logDir).free; + std::error_code ec; + uintmax_t freeSpace; + auto freeSpaceInfo = fs::space(m_logDir, ec); + if (!ec) { + freeSpace = freeSpaceInfo.available; + } else { + freeSpace = UINTMAX_MAX; + } if (freeSpace < kFreeSpaceThreshold) { // Delete oldest FRC_*.wpilog files (ignore FRC_TBD_*.wpilog as we just // created one) std::vector entries; - std::error_code ec; for (auto&& entry : fs::directory_iterator{m_logDir, ec}) { auto stem = entry.path().stem().string(); if (wpi::starts_with(stem, "FRC_") && @@ -462,6 +468,9 @@ static Instance& GetInstance(std::string_view dir = "", std::string_view filename = "", double period = 0.25) { static Instance instance(dir, filename, period); + if (!instance.owner) { + instance.owner.Start(MakeLogDir(dir), filename, period); + } return instance; } @@ -470,6 +479,12 @@ void DataLogManager::Start(std::string_view dir, std::string_view filename, GetInstance(dir, filename, period); } +void DataLogManager::Stop() { + auto& inst = GetInstance(); + inst.owner.GetThread()->m_log.Stop(); + inst.owner.Stop(); +} + void DataLogManager::Log(std::string_view message) { GetInstance().owner.GetThread()->m_messageLog.Append(message); fmt::print("{}\n", message); @@ -503,6 +518,10 @@ void DLM_Start(const char* dir, const char* filename, double period) { DataLogManager::Start(dir, filename, period); } +void DLM_Stop(void) { + DataLogManager::Stop(); +} + void DLM_Log(const char* message) { DataLogManager::Log(message); } diff --git a/ntcoreffi/src/main/native/include/DataLogManager.h b/ntcoreffi/src/main/native/include/DataLogManager.h index 8445399dde9..d2635c17e40 100644 --- a/ntcoreffi/src/main/native/include/DataLogManager.h +++ b/ntcoreffi/src/main/native/include/DataLogManager.h @@ -52,6 +52,11 @@ class DataLogManager final { static void Start(std::string_view dir = "", std::string_view filename = "", double period = 0.25); + /** + * Stop data log manager. + */ + static void Stop(); + /** * Log a message to the "messages" entry. The message is also printed to * standard output (followed by a newline). @@ -110,6 +115,11 @@ struct WPI_DataLog; */ void DLM_Start(const char* dir, const char* filename, double period); +/** + * Stop data log manager. + */ +void DLM_Stop(void); + /** * Log a message to the "messages" entry. The message is also printed to * standard output (followed by a newline). diff --git a/ntcoreffi/src/main/native/symbols.txt b/ntcoreffi/src/main/native/symbols.txt index f7f299a6cdf..c18014b71c8 100644 --- a/ntcoreffi/src/main/native/symbols.txt +++ b/ntcoreffi/src/main/native/symbols.txt @@ -4,6 +4,7 @@ DLM_Log DLM_LogNetworkTables DLM_SignalNewDSDataOccur DLM_Start +DLM_Stop NT_AddListener NT_AddListenerMultiple NT_AddListenerSingle @@ -245,6 +246,7 @@ WPI_DataLog_Resume WPI_DataLog_SetFilename WPI_DataLog_SetMetadata WPI_DataLog_Start +WPI_DataLog_Stop WPI_DestroyEvent WPI_DestroySemaphore WPI_DestroySignalObject diff --git a/wpilibc/src/main/native/cpp/DataLogManager.cpp b/wpilibc/src/main/native/cpp/DataLogManager.cpp index 3907db8432c..afe9330daa5 100644 --- a/wpilibc/src/main/native/cpp/DataLogManager.cpp +++ b/wpilibc/src/main/native/cpp/DataLogManager.cpp @@ -111,12 +111,18 @@ Thread::~Thread() { void Thread::Main() { // based on free disk space, scan for "old" FRC_*.wpilog files and remove { - uintmax_t freeSpace = fs::space(m_logDir).available; + std::error_code ec; + uintmax_t freeSpace; + auto freeSpaceInfo = fs::space(m_logDir, ec); + if (!ec) { + freeSpace = freeSpaceInfo.available; + } else { + freeSpace = UINTMAX_MAX; + } if (freeSpace < kFreeSpaceThreshold) { // Delete oldest FRC_*.wpilog files (ignore FRC_TBD_*.wpilog as we just // created one) std::vector entries; - std::error_code ec; for (auto&& entry : fs::directory_iterator{m_logDir, ec}) { auto stem = entry.path().stem().string(); if (wpi::starts_with(stem, "FRC_") && @@ -308,6 +314,9 @@ static Instance& GetInstance(std::string_view dir = "", std::string_view filename = "", double period = 0.25) { static Instance instance(dir, filename, period); + if (!instance.owner) { + instance.owner.Start(MakeLogDir(dir), filename, period); + } return instance; } @@ -316,6 +325,12 @@ void DataLogManager::Start(std::string_view dir, std::string_view filename, GetInstance(dir, filename, period); } +void DataLogManager::Stop() { + auto& inst = GetInstance(); + inst.owner.GetThread()->m_log.Stop(); + inst.owner.Stop(); +} + void DataLogManager::Log(std::string_view message) { GetInstance().owner.GetThread()->m_messageLog.Append(message); fmt::print("{}\n", message); diff --git a/wpilibc/src/main/native/include/frc/DataLogManager.h b/wpilibc/src/main/native/include/frc/DataLogManager.h index fa7abbaec53..336af4a01ea 100644 --- a/wpilibc/src/main/native/include/frc/DataLogManager.h +++ b/wpilibc/src/main/native/include/frc/DataLogManager.h @@ -49,6 +49,11 @@ class DataLogManager final { static void Start(std::string_view dir = "", std::string_view filename = "", double period = 0.25); + /** + * Stop data log manager. + */ + static void Stop(); + /** * Log a message to the "messages" entry. The message is also printed to * standard output (followed by a newline). diff --git a/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java b/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java index ba2d7808f3f..8098ae9af95 100644 --- a/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java +++ b/wpilibj/src/main/java/edu/wpi/first/wpilibj/DataLogManager.java @@ -41,9 +41,10 @@ */ public final class DataLogManager { private static DataLog m_log; + private static boolean m_stopped; private static String m_logDir; private static boolean m_filenameOverride; - private static final Thread m_thread; + private static Thread m_thread; private static final ZoneId m_utc = ZoneId.of("UTC"); private static final DateTimeFormatter m_timeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss").withZone(m_utc); @@ -59,11 +60,6 @@ public final class DataLogManager { private DataLogManager() {} - static { - m_thread = new Thread(DataLogManager::logMain, "DataLogDS"); - m_thread.setDaemon(true); - } - /** Start data log manager with default directory location. */ public static synchronized void start() { start("", "", 0.25); @@ -100,33 +96,52 @@ public static synchronized void start(String dir, String filename) { * tradeoff */ public static synchronized void start(String dir, String filename, double period) { - if (m_log != null) { - return; - } - m_logDir = makeLogDir(dir); - m_filenameOverride = !filename.isEmpty(); + if (m_log == null) { + m_logDir = makeLogDir(dir); + m_filenameOverride = !filename.isEmpty(); - // Delete all previously existing FRC_TBD_*.wpilog files. These only exist when the robot - // never connects to the DS, so they are very unlikely to have useful data and just clutter - // the filesystem. - File[] files = - new File(m_logDir) - .listFiles((d, name) -> name.startsWith("FRC_TBD_") && name.endsWith(".wpilog")); - if (files != null) { - for (File file : files) { - if (!file.delete()) { - System.err.println("DataLogManager: could not delete " + file); + // Delete all previously existing FRC_TBD_*.wpilog files. These only exist when the robot + // never connects to the DS, so they are very unlikely to have useful data and just clutter + // the filesystem. + File[] files = + new File(m_logDir) + .listFiles((d, name) -> name.startsWith("FRC_TBD_") && name.endsWith(".wpilog")); + if (files != null) { + for (File file : files) { + if (!file.delete()) { + System.err.println("DataLogManager: could not delete " + file); + } } } + m_log = new DataLog(m_logDir, makeLogFilename(filename), period); + m_messageLog = new StringLogEntry(m_log, "messages"); + + // Log all NT entries and connections + if (m_ntLoggerEnabled) { + startNtLog(); + } + } else if (m_stopped) { + m_log.setFilename(makeLogFilename(filename)); + m_log.resume(); + m_stopped = false; } - m_log = new DataLog(m_logDir, makeLogFilename(filename), period); - m_messageLog = new StringLogEntry(m_log, "messages"); - m_thread.start(); + if (m_thread == null) { + m_thread = new Thread(DataLogManager::logMain, "DataLogDS"); + m_thread.setDaemon(true); + m_thread.start(); + } + } - // Log all NT entries and connections - if (m_ntLoggerEnabled) { - startNtLog(); + /** Stop data log manager. */ + public static synchronized void stop() { + if (m_thread != null) { + m_thread.interrupt(); + m_thread = null; + } + if (m_log != null) { + m_log.stop(); + m_stopped = true; } } diff --git a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java index 025487cd299..97c629f6a24 100644 --- a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java +++ b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLog.java @@ -106,11 +106,20 @@ public void pause() { DataLogJNI.pause(m_impl); } - /** Resumes appending of data records to the log. */ + /** + * Resumes appending of data records to the log. If called after stop(), opens a new file (with + * random name if SetFilename was not called after stop()) and appends Start records and schema + * data values for all previously started entries and schemas. + */ public void resume() { DataLogJNI.resume(m_impl); } + /** Stops appending all records to the log, and closes the log file. */ + public void stop() { + DataLogJNI.stop(m_impl); + } + /** * Returns whether there is a data schema already registered with the given name. * diff --git a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java index 08b108c0830..f94a86f057d 100644 --- a/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java +++ b/wpiutil/src/main/java/edu/wpi/first/util/datalog/DataLogJNI.java @@ -18,6 +18,8 @@ public class DataLogJNI extends WPIUtilJNI { static native void resume(long impl); + static native void stop(long impl); + static native void addSchema(long impl, String name, String type, byte[] schema, long timestamp); static native void addSchemaString( diff --git a/wpiutil/src/main/native/cpp/DataLog.cpp b/wpiutil/src/main/native/cpp/DataLog.cpp index 9994618fb5b..d05a49e43b7 100644 --- a/wpiutil/src/main/native/cpp/DataLog.cpp +++ b/wpiutil/src/main/native/cpp/DataLog.cpp @@ -179,7 +179,7 @@ DataLog::DataLog(wpi::Logger& msglog, DataLog::~DataLog() { { std::scoped_lock lock{m_mutex}; - m_active = false; + m_state = kShutdown; m_doFlush = true; } m_cond.notify_all(); @@ -204,12 +204,25 @@ void DataLog::Flush() { void DataLog::Pause() { std::scoped_lock lock{m_mutex}; - m_paused = true; + m_state = kPaused; } void DataLog::Resume() { std::scoped_lock lock{m_mutex}; - m_paused = false; + if (m_state == kPaused) { + m_state = kActive; + } else if (m_state == kStopped) { + m_state = kStart; + } +} + +void DataLog::Stop() { + { + std::scoped_lock lock{m_mutex}; + m_state = kStopped; + m_newFilename.clear(); + } + m_cond.notify_all(); } bool DataLog::HasSchema(std::string_view name) const { @@ -229,12 +242,16 @@ void DataLog::AddSchema(std::string_view name, std::string_view type, if (entryInfo.id != 0) { return; // don't add duplicates } + entryInfo.schemaData.assign(schema.begin(), schema.end()); int entry = StartImpl(fullName, type, {}, timestamp); // inline AppendRaw() without releasing lock if (entry <= 0) { [[unlikely]] return; // should never happen, but check anyway } + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return; + } StartRecord(entry, timestamp, schema.size(), 0); AppendImpl(schema); } @@ -283,105 +300,201 @@ static std::string MakeRandomFilename() { return filename; } -void DataLog::WriterThreadMain(std::string_view dir) { - std::chrono::duration periodTime{m_period}; +struct DataLog::WriterThreadState { + explicit WriterThreadState(std::string_view dir) : dirPath{dir} {} + WriterThreadState(const WriterThreadState&) = delete; + WriterThreadState& operator=(const WriterThreadState&) = delete; + ~WriterThreadState() { Close(); } - std::error_code ec; - fs::path dirPath{dir}; - std::string filename; + void Close() { + if (f != fs::kInvalidFile) { + fs::CloseFile(f); + f = fs::kInvalidFile; + } + } - { - std::scoped_lock lock{m_mutex}; - filename = std::move(m_newFilename); - m_newFilename.clear(); + void SetFilename(std::string_view fn) { + baseFilename = fn; + filename = fn; + path = dirPath / filename; + segmentCount = 1; } - if (filename.empty()) { - filename = MakeRandomFilename(); + void IncrementFilename() { + fs::path basePath{baseFilename}; + filename = fmt::format("{}.{}{}", basePath.stem().string(), ++segmentCount, + basePath.extension().string()); + path = dirPath / filename; } + fs::path dirPath; + std::string baseFilename; + std::string filename; + fs::path path; fs::file_t f = fs::kInvalidFile; + uintmax_t freeSpace = UINTMAX_MAX; + int segmentCount = 1; +}; + +void DataLog::StartLogFile(WriterThreadState& state) { + std::error_code ec; + + if (state.filename.empty()) { + state.SetFilename(MakeRandomFilename()); + } // get free space - uintmax_t freeSpace = fs::space(dirPath).available; - if (freeSpace < kMinFreeSpace) { + auto freeSpaceInfo = fs::space(state.dirPath, ec); + if (!ec) { + state.freeSpace = freeSpaceInfo.available; + } else { + state.freeSpace = UINTMAX_MAX; + } + if (state.freeSpace < kMinFreeSpace) { WPI_ERROR(m_msglog, "Insufficient free space ({} available), no log being saved", - FormatBytesSize(freeSpace)); + FormatBytesSize(state.freeSpace)); } else { // try preferred filename, or randomize it a few times, before giving up for (int i = 0; i < 5; ++i) { // open file for append #ifdef _WIN32 // WIN32 doesn't allow combination of CreateNew and Append - f = fs::OpenFileForWrite(dirPath / filename, ec, fs::CD_CreateNew, - fs::OF_None); + state.f = + fs::OpenFileForWrite(state.path, ec, fs::CD_CreateNew, fs::OF_None); #else - f = fs::OpenFileForWrite(dirPath / filename, ec, fs::CD_CreateNew, - fs::OF_Append); + state.f = + fs::OpenFileForWrite(state.path, ec, fs::CD_CreateNew, fs::OF_Append); #endif if (ec) { WPI_ERROR(m_msglog, "Could not open log file '{}': {}", - (dirPath / filename).string(), ec.message()); + state.path.string(), ec.message()); // try again with random filename - filename = MakeRandomFilename(); + state.SetFilename(MakeRandomFilename()); } else { break; } } - if (f == fs::kInvalidFile) { + if (state.f == fs::kInvalidFile) { WPI_ERROR(m_msglog, "Could not open log file, no log being saved"); } else { - WPI_INFO(m_msglog, "Logging to '{}' ({} free space)", - (dirPath / filename).string(), FormatBytesSize(freeSpace)); + WPI_INFO(m_msglog, "Logging to '{}' ({} free space)", state.path.string(), + FormatBytesSize(state.freeSpace)); } } // write header (version 1.0) - if (f != fs::kInvalidFile) { + if (state.f != fs::kInvalidFile) { const uint8_t header[] = {'W', 'P', 'I', 'L', 'O', 'G', 0, 1}; - WriteToFile(f, header, filename, m_msglog); + WriteToFile(state.f, header, state.filename, m_msglog); uint8_t extraLen[4]; support::endian::write32le(extraLen, m_extraHeader.size()); - WriteToFile(f, extraLen, filename, m_msglog); + WriteToFile(state.f, extraLen, state.filename, m_msglog); if (m_extraHeader.size() > 0) { - WriteToFile(f, + WriteToFile(state.f, {reinterpret_cast(m_extraHeader.data()), m_extraHeader.size()}, - filename, m_msglog); + state.filename, m_msglog); } } +} +void DataLog::WriterThreadMain(std::string_view dir) { + std::chrono::duration periodTime{m_period}; + + WriterThreadState state{dir}; + { + std::scoped_lock lock{m_mutex}; + state.SetFilename(m_newFilename); + m_newFilename.clear(); + } + StartLogFile(state); + + std::error_code ec; std::vector toWrite; int freeSpaceCount = 0; + int checkExistCount = 0; bool blocked = false; + uintmax_t written = 0; std::unique_lock lock{m_mutex}; - while (m_active) { + while (m_state != kShutdown) { bool doFlush = false; auto timeoutTime = std::chrono::steady_clock::now() + periodTime; if (m_cond.wait_until(lock, timeoutTime) == std::cv_status::timeout) { doFlush = true; } - if (!m_newFilename.empty() && f != fs::kInvalidFile) { + if (m_state == kStopped) { + state.Close(); + continue; + } + + bool doStart = false; + + // if file was deleted, recreate it with the same name + if (++checkExistCount >= 10) { + checkExistCount = 0; + lock.unlock(); + bool exists = fs::exists(state.path, ec); + lock.lock(); + if (!ec && !exists) { + state.Close(); + state.IncrementFilename(); + WPI_INFO(m_msglog, "Log file deleted, recreating as fresh log '{}'", + state.filename); + doStart = true; + } + } + + // start new file if file exceeds 1.8 GB + if (written > 1800000000ull) { + state.Close(); + state.IncrementFilename(); + WPI_INFO(m_msglog, "Log file reached 1.8 GB, starting new file '{}'", + state.filename); + doStart = true; + } + + if (m_state == kStart || doStart) { + lock.unlock(); + StartLogFile(state); + lock.lock(); + if (state.f != fs::kInvalidFile) { + // Emit start and schema data records + for (auto&& entryInfo : m_entries) { + AppendStartRecord(entryInfo.second.id, entryInfo.first(), + entryInfo.second.type, + m_entryIds[entryInfo.second.id].metadata, 0); + if (!entryInfo.second.schemaData.empty()) { + StartRecord(entryInfo.second.id, 0, + entryInfo.second.schemaData.size(), 0); + AppendImpl(entryInfo.second.schemaData); + } + } + } + m_state = kActive; + written = 0; + } + + if (!m_newFilename.empty() && state.f != fs::kInvalidFile) { auto newFilename = std::move(m_newFilename); m_newFilename.clear(); - lock.unlock(); // rename - if (filename != newFilename) { - fs::rename(dirPath / filename, dirPath / newFilename, ec); + if (state.filename != newFilename) { + lock.unlock(); + fs::rename(state.path, state.dirPath / newFilename, ec); + lock.lock(); } if (ec) { WPI_ERROR(m_msglog, "Could not rename log file from '{}' to '{}': {}", - filename, newFilename, ec.message()); + state.filename, newFilename, ec.message()); } else { - WPI_INFO(m_msglog, "Renamed log file from '{}' to '{}'", filename, + WPI_INFO(m_msglog, "Renamed log file from '{}' to '{}'", state.filename, newFilename); } - filename = std::move(newFilename); - lock.lock(); + state.SetFilename(newFilename); } if (doFlush || m_doFlush) { @@ -393,39 +506,45 @@ void DataLog::WriterThreadMain(std::string_view dir) { // swap outgoing with empty vector toWrite.swap(m_outgoing); - if (f != fs::kInvalidFile && !blocked) { + if (state.f != fs::kInvalidFile && !blocked) { lock.unlock(); // update free space every 10 flushes (in case other things are writing) if (++freeSpaceCount >= 10) { freeSpaceCount = 0; - freeSpace = fs::space(dirPath).available; + auto freeSpaceInfo = fs::space(state.dirPath, ec); + if (!ec) { + state.freeSpace = freeSpaceInfo.available; + } else { + state.freeSpace = UINTMAX_MAX; + } } // write buffers to file for (auto&& buf : toWrite) { // stop writing when we go below the minimum free space - freeSpace -= buf.GetData().size(); - if (freeSpace < kMinFreeSpace) { + state.freeSpace -= buf.GetData().size(); + written += buf.GetData().size(); + if (state.freeSpace < kMinFreeSpace) { [[unlikely]] WPI_ERROR( m_msglog, "Stopped logging due to low free space ({} available)", - FormatBytesSize(freeSpace)); + FormatBytesSize(state.freeSpace)); blocked = true; break; } - WriteToFile(f, buf.GetData(), filename, m_msglog); + WriteToFile(state.f, buf.GetData(), state.filename, m_msglog); } // sync to storage #if defined(__linux__) - ::fdatasync(f); + ::fdatasync(state.f); #elif defined(__APPLE__) - ::fsync(f); + ::fsync(state.f); #endif lock.lock(); if (blocked) { - [[unlikely]] m_paused = true; + [[unlikely]] m_state = kPaused; } } @@ -439,10 +558,6 @@ void DataLog::WriterThreadMain(std::string_view dir) { toWrite.resize(0); } } - - if (f != fs::kInvalidFile) { - fs::CloseFile(f); - } } void DataLog::WriterThreadMain( @@ -465,7 +580,7 @@ void DataLog::WriterThreadMain( std::vector toWrite; std::unique_lock lock{m_mutex}; - while (m_active) { + while (m_state != kShutdown) { bool doFlush = false; auto timeoutTime = std::chrono::steady_clock::now() + periodTime; if (m_cond.wait_until(lock, timeoutTime) == std::cv_status::timeout) { @@ -521,9 +636,9 @@ int DataLog::StartImpl(std::string_view name, std::string_view type, if (entryInfo.id == 0) { entryInfo.id = ++m_lastId; } - auto& savedCount = m_entryCounts[entryInfo.id]; - ++savedCount; - if (savedCount > 1) { + auto& entryInfo2 = m_entryIds[entryInfo.id]; + ++entryInfo2.count; + if (entryInfo2.count > 1) { if (entryInfo.type != type) { WPI_ERROR(m_msglog, "type mismatch for '{}': was '{}', requested '{}'; ignoring", @@ -533,15 +648,26 @@ int DataLog::StartImpl(std::string_view name, std::string_view type, return entryInfo.id; } entryInfo.type = type; + entryInfo2.metadata = metadata; + + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return entryInfo.id; + } + + AppendStartRecord(entryInfo.id, name, type, metadata, timestamp); + return entryInfo.id; +} + +void DataLog::AppendStartRecord(int id, std::string_view name, + std::string_view type, + std::string_view metadata, int64_t timestamp) { size_t strsize = name.size() + type.size() + metadata.size(); uint8_t* buf = StartRecord(0, timestamp, 5 + 12 + strsize, 5); *buf++ = impl::kControlStart; - wpi::support::endian::write32le(buf, entryInfo.id); + wpi::support::endian::write32le(buf, id); AppendStringImpl(name); AppendStringImpl(type); AppendStringImpl(metadata); - - return entryInfo.id; } void DataLog::Finish(int entry, int64_t timestamp) { @@ -549,15 +675,18 @@ void DataLog::Finish(int entry, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - auto& savedCount = m_entryCounts[entry]; - if (savedCount == 0) { + auto& entryInfo2 = m_entryIds[entry]; + if (entryInfo2.count == 0) { return; } - --savedCount; - if (savedCount != 0) { + --entryInfo2.count; + if (entryInfo2.count != 0) { return; } - m_entryCounts.erase(entry); + m_entryIds.erase(entry); + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return; + } uint8_t* buf = StartRecord(0, timestamp, 5, 5); *buf++ = impl::kControlFinish; wpi::support::endian::write32le(buf, entry); @@ -569,6 +698,10 @@ void DataLog::SetMetadata(int entry, std::string_view metadata, return; } std::scoped_lock lock{m_mutex}; + m_entryIds[entry].metadata = metadata; + if (m_state != kActive && m_state != kPaused) { + [[unlikely]] return; + } uint8_t* buf = StartRecord(0, timestamp, 5 + 4 + metadata.size(), 5); *buf++ = impl::kControlSetMetadata; wpi::support::endian::write32le(buf, entry); @@ -584,7 +717,7 @@ uint8_t* DataLog::Reserve(size_t size) { m_msglog, "outgoing buffers exceeded threshold, pausing logging--" "consider flushing to disk more frequently (smaller period)"); - m_paused = true; + m_state = kPaused; } m_outgoing.emplace_back(); } else { @@ -626,8 +759,8 @@ void DataLog::AppendRaw(int entry, std::span data, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, data.size(), 0); AppendImpl(data); @@ -640,8 +773,8 @@ void DataLog::AppendRaw2(int entry, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } size_t size = 0; for (auto&& chunk : data) { @@ -658,8 +791,8 @@ void DataLog::AppendBoolean(int entry, bool value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 1, 1); buf[0] = value ? 1 : 0; @@ -670,8 +803,8 @@ void DataLog::AppendInteger(int entry, int64_t value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 8, 8); wpi::support::endian::write64le(buf, value); @@ -682,8 +815,8 @@ void DataLog::AppendFloat(int entry, float value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 4, 4); if constexpr (wpi::support::endian::system_endianness() == @@ -699,8 +832,8 @@ void DataLog::AppendDouble(int entry, double value, int64_t timestamp) { return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, 8, 8); if constexpr (wpi::support::endian::system_endianness() == @@ -724,8 +857,8 @@ void DataLog::AppendBooleanArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size(), 0); uint8_t* buf; @@ -748,8 +881,8 @@ void DataLog::AppendBooleanArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size(), 0); uint8_t* buf; @@ -783,8 +916,8 @@ void DataLog::AppendIntegerArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size() * 8, 0); uint8_t* buf; @@ -816,8 +949,8 @@ void DataLog::AppendFloatArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size() * 4, 0); uint8_t* buf; @@ -849,8 +982,8 @@ void DataLog::AppendDoubleArray(int entry, std::span arr, return; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } StartRecord(entry, timestamp, arr.size() * 8, 0); uint8_t* buf; @@ -882,8 +1015,8 @@ void DataLog::AppendStringArray(int entry, std::span arr, size += 4 + str.size(); } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, size, 4); wpi::support::endian::write32le(buf, arr.size()); @@ -905,8 +1038,8 @@ void DataLog::AppendStringArray(int entry, size += 4 + str.size(); } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, size, 4); wpi::support::endian::write32le(buf, arr.size()); @@ -928,8 +1061,8 @@ void DataLog::AppendStringArray(int entry, size += 4 + str.len; } std::scoped_lock lock{m_mutex}; - if (m_paused) { - return; + if (m_state != kActive) { + [[unlikely]] return; } uint8_t* buf = StartRecord(entry, timestamp, size, 4); wpi::support::endian::write32le(buf, arr.size()); @@ -975,6 +1108,10 @@ void WPI_DataLog_Resume(struct WPI_DataLog* datalog) { reinterpret_cast(datalog)->Resume(); } +void WPI_DataLog_Stop(struct WPI_DataLog* datalog) { + reinterpret_cast(datalog)->Stop(); +} + int WPI_DataLog_Start(struct WPI_DataLog* datalog, const char* name, const char* type, const char* metadata, int64_t timestamp) { diff --git a/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp b/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp index cbc233dc506..c78c891ea54 100644 --- a/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp +++ b/wpiutil/src/main/native/cpp/jni/DataLogJNI.cpp @@ -111,6 +111,22 @@ Java_edu_wpi_first_util_datalog_DataLogJNI_resume reinterpret_cast(impl)->Resume(); } +/* + * Class: edu_wpi_first_util_datalog_DataLogJNI + * Method: stop + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_edu_wpi_first_util_datalog_DataLogJNI_stop + (JNIEnv* env, jclass, jlong impl) +{ + if (impl == 0) { + wpi::ThrowNullPointerException(env, "impl is null"); + return; + } + reinterpret_cast(impl)->Stop(); +} + /* * Class: edu_wpi_first_util_datalog_DataLogJNI * Method: addSchema diff --git a/wpiutil/src/main/native/include/wpi/DataLog.h b/wpiutil/src/main/native/include/wpi/DataLog.h index 9e53bdcfa63..99db9646280 100644 --- a/wpiutil/src/main/native/include/wpi/DataLog.h +++ b/wpiutil/src/main/native/include/wpi/DataLog.h @@ -174,10 +174,18 @@ class DataLog final { void Pause(); /** - * Resumes appending of data records to the log. + * Resumes appending of data records to the log. If called after Stop(), + * opens a new file (with random name if SetFilename was not called after + * Stop()) and appends Start records and schema data values for all previously + * started entries and schemas. */ void Resume(); + /** + * Stops appending all records to the log, and closes the log file. + */ + void Stop(); + /** * Returns whether there is a data schema already registered with the given * name. @@ -456,6 +464,9 @@ class DataLog final { int64_t timestamp); private: + struct WriterThreadState; + + void StartLogFile(WriterThreadState& state); void WriterThreadMain(std::string_view dir); void WriterThreadMain( std::function data)> write); @@ -468,13 +479,20 @@ class DataLog final { uint8_t* Reserve(size_t size); void AppendImpl(std::span data); void AppendStringImpl(std::string_view str); + void AppendStartRecord(int id, std::string_view name, std::string_view type, + std::string_view metadata, int64_t timestamp); wpi::Logger& m_msglog; mutable wpi::mutex m_mutex; wpi::condition_variable m_cond; - bool m_active{true}; bool m_doFlush{false}; - bool m_paused{false}; + enum State { + kStart, + kActive, + kPaused, + kStopped, + kShutdown, + } m_state = kActive; double m_period; std::string m_extraHeader; std::string m_newFilename; @@ -483,10 +501,15 @@ class DataLog final { std::vector m_outgoing; struct EntryInfo { std::string type; + std::vector schemaData; // only set for schema entries int id{0}; }; wpi::StringMap m_entries; - wpi::DenseMap m_entryCounts; + struct EntryInfo2 { + std::string metadata; + unsigned int count; + }; + wpi::DenseMap m_entryIds; int m_lastId = 0; std::thread m_thread; }; @@ -1113,12 +1136,22 @@ void WPI_DataLog_Flush(struct WPI_DataLog* datalog); void WPI_DataLog_Pause(struct WPI_DataLog* datalog); /** - * Resumes appending of data records to the log. + * Resumes appending of data records to the log. If called after Stop(), + * opens a new file (with random name if SetFilename was not called after + * Stop()) and appends Start records and schema data values for all previously + * started entries and schemas. * * @param datalog data log */ void WPI_DataLog_Resume(struct WPI_DataLog* datalog); +/** + * Stops appending all records to the log, and closes the log file. + * + * @param datalog data log + */ +void WPI_DataLog_Stop(struct WPI_DataLog* datalog); + /** * Start an entry. Duplicate names are allowed (with the same type), and * result in the same index being returned (Start/Finish are reference