From 6ad5a1111364841a64eb3e1b8c1f6919fa4fea60 Mon Sep 17 00:00:00 2001 From: Brian-Acosta Date: Mon, 1 Apr 2024 22:41:30 -0400 Subject: [PATCH 1/2] add lcm log sink files --- lcm/BUILD.bazel | 10 ++++ lcm/lcm_log_sink.cc | 140 ++++++++++++++++++++++++++++++++++++++++++++ lcm/lcm_log_sink.h | 124 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 274 insertions(+) create mode 100644 lcm/lcm_log_sink.cc create mode 100644 lcm/lcm_log_sink.h diff --git a/lcm/BUILD.bazel b/lcm/BUILD.bazel index f2bc0e23b..b52781317 100644 --- a/lcm/BUILD.bazel +++ b/lcm/BUILD.bazel @@ -20,6 +20,16 @@ cc_library( ], ) +cc_library( + name = "lcm_log_sink", + srcs = ["lcm_log_sink.cc"], + hdrs = ["lcm_log_sink.h"], + deps = [ + "@drake//:drake_shared_library", + "@lcm", + ], +) + cc_library( name = "lcm_trajectory_saver", srcs = ["lcm_trajectory.cc"], diff --git a/lcm/lcm_log_sink.cc b/lcm/lcm_log_sink.cc new file mode 100644 index 000000000..406a984eb --- /dev/null +++ b/lcm/lcm_log_sink.cc @@ -0,0 +1,140 @@ +#include "lcm_log_sink.h" + +#include +#include +#include +#include + +#include "lcm/lcm.h" + +#include "drake/common/drake_assert.h" + +namespace dairlib { +namespace lcm { + +using drake::lcm::DrakeLcmInterface; +using drake::lcm::DrakeSubscriptionInterface; + +using HandlerFunction = DrakeLcmInterface::HandlerFunction; +using MultichannelHandlerFunction = + DrakeLcmInterface::MultichannelHandlerFunction; + +class LcmLogSink::Impl { + public: + std::vector<::lcm_eventlog_event_t> event_buf_{}; + + void Append(const std::string& channel, const void* data, + int data_size, unsigned long timestamp) { + ::lcm_eventlog_event_t log_event{}; + log_event.timestamp = timestamp; + log_event.channellen = channel.size(); + log_event.channel = static_cast(malloc(sizeof(char) * (channel.size() + 1))); + DRAKE_DEMAND(log_event.channel != nullptr); + strcpy(log_event.channel, channel.c_str()); + log_event.datalen = data_size; + log_event.data = malloc(data_size); + DRAKE_DEMAND(log_event.data != nullptr); + memcpy(log_event.data, data, data_size); + event_buf_.push_back(log_event); + } + + void WriteLog(const std::string& filename) { + lcm_eventlog_t* log = ::lcm_eventlog_create(filename.c_str(), "w"); + if (log == nullptr) { + throw std::logic_error("Couldn't create lcm log " + filename); + } + for (::lcm_eventlog_event_t& event: event_buf_) { + int status = ::lcm_eventlog_write_event(log, &event); + if (status != 0) { + throw std::logic_error("Message write failure"); + } + } + ::lcm_eventlog_destroy(log); + } + + void FreeBuf() { + for (auto& e: event_buf_) { + if (e.channel != nullptr) { + free(e.channel); + e.channel = nullptr; + } + if (e.data != nullptr) { + free(e.data); + e.data = nullptr; + } + } + event_buf_.clear(); + } + + ~Impl() { FreeBuf(); } +}; + +LcmLogSink::LcmLogSink(bool overwrite_publish_time_with_system_clock): + overwrite_publish_time_with_system_clock_( + overwrite_publish_time_with_system_clock), + url_("lcmlogsink//:logsink"), + impl_(new Impl) {} + +LcmLogSink::~LcmLogSink() = default; + +std::string LcmLogSink::get_lcm_url() const { + return url_; +} + +void LcmLogSink::Publish(const std::string& channel, const void* data, + int data_size, std::optional time_sec) { + unsigned long timestamp; + if (!overwrite_publish_time_with_system_clock_) { + timestamp = second_to_timestamp(time_sec.value_or(0.0)); + } else { + timestamp = std::chrono::steady_clock::now().time_since_epoch() / + std::chrono::microseconds(1); + } + impl_->Append(channel, data, data_size, timestamp); +} + +void LcmLogSink::WriteLog(const std::string &fname) { + impl_->WriteLog(fname); +} +void LcmLogSink::clear() { + impl_->FreeBuf(); +} + +std::shared_ptr LcmLogSink::Subscribe( + const std::string& channel, HandlerFunction handler) { + throw std::logic_error("You are trying to subscribe to an LCM channel using" + " LcmLogSink, which is a write only implementation of " + "DrakeLcmInterface."); + return nullptr; +} + +std::shared_ptr LcmLogSink::SubscribeMultichannel( + std::string_view /* regex */, MultichannelHandlerFunction /* handler */) { + throw std::logic_error("You are trying to subscribe to an LCM channel using" + " LcmLogSink, which is a write only implementation of " + "DrakeLcmInterface."); + return nullptr; +} + +std::shared_ptr LcmLogSink::SubscribeAllChannels( + MultichannelHandlerFunction handler) { + throw std::logic_error("You are trying to subscribe to an LCM channel using" + " LcmLogSink, which is a write only implementation of " + "DrakeLcmInterface."); + return nullptr; +} + +int LcmLogSink::HandleSubscriptions(int) { + throw std::logic_error("You are trying to subscribe to an LCM channel using" + " LcmLogSink, which is a write only implementation of " + "DrakeLcmInterface."); + return 0; +} + +void LcmLogSink::OnHandleSubscriptionsError(const std::string& error_message) { + // We are not called via LCM C code, so it's safe to throw there. + throw std::runtime_error(error_message); +} + +} // namespace lcm +} // namespace drake \ No newline at end of file diff --git a/lcm/lcm_log_sink.h b/lcm/lcm_log_sink.h new file mode 100644 index 000000000..7060c1d4d --- /dev/null +++ b/lcm/lcm_log_sink.h @@ -0,0 +1,124 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "drake/common/drake_copyable.h" +#include "drake/lcm/drake_lcm_interface.h" + +namespace dairlib { +namespace lcm { + +/** + * A LCM interface for logging LCM messages to a file. Contains an internal + * buffer used to store lcm_eventlog events until the user calls WriteLog to + * write an lcm log to disk. Reimplements `DrakeLcmLog` as a write only + * interface and avoids writing to disk on every call to Publish(). + */ + class LcmLogSink : public drake::lcm::DrakeLcmInterface { + public: + DRAKE_NO_COPY_NO_MOVE_NO_ASSIGN(LcmLogSink); + + /** + * Constructs an LcmLogSink. + * @param overwrite_publish_time_with_system_clock If true, override the + * `second` parameter passed to Publish method, and use host system's clock + * to generate the timestamp for the logged message. This is used to mimic + * lcm-logger's behavior. It also implicitly records how fast the messages + * are generated in real time. + * + */ + LcmLogSink(bool overwrite_publish_time_with_system_clock = false); + + ~LcmLogSink() override; + + /** + * Writes an entry that occurred at @p timestamp with content @p data to the + * internal event log buffer. + * @param channel Channel name. + * @param data Pointer to raw bytes. + * @param data_size Number of bytes in @p data. + * @param time_sec Time in seconds when the message is published. Since + * messages are save to the log file in the order of Publish calls, this + * function should only be called with non-decreasing @p second. Note that + * this parameter can be overwritten by the host system's clock if + * `overwrite_publish_time_with_system_clock` is true at construction time. + * + * @throws std::exception if a new event cannot be allocated to insert into + * the buffer + */ + void Publish(const std::string& channel, const void* data, int data_size, + std::optional time_sec) override; + + /** + * Write the contents of this LcmLogSink to disk as an lcm log. + * @param fname the filename of the lcmlog to write. + */ + void WriteLog(const std::string& fname); + + /** + * Erase the contents of this LcmLogSink + */ + void clear(); + + /** + * Throws an exception because LcmLogSink is write-only + */ + std::shared_ptr Subscribe( + const std::string& channel, HandlerFunction handler) override; + + /** Throws an exception because LcmLogSink is write-only */ + std::shared_ptr SubscribeMultichannel( + std::string_view regex, MultichannelHandlerFunction) override; + + /** + * Throws an exception because LcmLogSink is write-only. + */ + std::shared_ptr SubscribeAllChannels( + MultichannelHandlerFunction) override; + + /** + * Throws an exception because LcmLogSink is write-only + */ + int HandleSubscriptions(int) override; + + + /** + * Converts @p timestamp (in microseconds) to time (in seconds) relative to + * the starting time passed to the constructor. + */ + double timestamp_to_second(uint64_t timestamp) const { + return static_cast(timestamp) / 1e6; + } + + /** + * Converts time (in seconds) relative to the starting time passed to the + * constructor to a timestamp in microseconds. + */ + uint64_t second_to_timestamp(double sec) const { + return static_cast(sec * 1e6); + } + + std::string get_lcm_url() const override; + + private: + void OnHandleSubscriptionsError(const std::string&) override; + + const bool overwrite_publish_time_with_system_clock_; + const std::string url_; + + // TODO(jwnimmer-tri) It is not clear to me why this class needs a mutex + // (i.e., where multiple threads are coming from). That factor needs to be + // re-discovered and then documented somewhere. + + // This mutex guards access to the Impl object. + mutable std::mutex mutex_; + class Impl; + const std::unique_ptr impl_; +}; + +} // namespace lcm +} // namespace drake \ No newline at end of file From d7558983253cfff92ea105b87d140bc755a99e8c Mon Sep 17 00:00:00 2001 From: Brian Acosta Date: Fri, 3 May 2024 13:50:45 -0400 Subject: [PATCH 2/2] use duration cast --- lcm/lcm_log_sink.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lcm/lcm_log_sink.cc b/lcm/lcm_log_sink.cc index 406a984eb..b13079e34 100644 --- a/lcm/lcm_log_sink.cc +++ b/lcm/lcm_log_sink.cc @@ -87,8 +87,8 @@ void LcmLogSink::Publish(const std::string& channel, const void* data, if (!overwrite_publish_time_with_system_clock_) { timestamp = second_to_timestamp(time_sec.value_or(0.0)); } else { - timestamp = std::chrono::steady_clock::now().time_since_epoch() / - std::chrono::microseconds(1); + timestamp = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); } impl_->Append(channel, data, data_size, timestamp); }