Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create LcmLogSink, a VectorLogSink analog implemented as an lcm interface #358

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions lcm/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ cc_library(
],
)

cc_library(
name = "lcm_log_sink",
srcs = ["lcm_log_sink.cc"],
hdrs = ["lcm_log_sink.h"],
deps = [
"@drake//:drake_shared_library",
"@lcm",
],
)

cc_library(
name = "lcm_trajectory_saver",
srcs = ["lcm_trajectory.cc"],
Expand Down
140 changes: 140 additions & 0 deletions lcm/lcm_log_sink.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include "lcm_log_sink.h"

#include <memory>
#include <map>
#include <stdexcept>
#include <vector>

#include "lcm/lcm.h"

#include "drake/common/drake_assert.h"

namespace dairlib {
namespace lcm {

using drake::lcm::DrakeLcmInterface;
using drake::lcm::DrakeSubscriptionInterface;

using HandlerFunction = DrakeLcmInterface::HandlerFunction;
using MultichannelHandlerFunction =
DrakeLcmInterface::MultichannelHandlerFunction;

class LcmLogSink::Impl {
public:
std::vector<::lcm_eventlog_event_t> event_buf_{};

void Append(const std::string& channel, const void* data,
int data_size, unsigned long timestamp) {
::lcm_eventlog_event_t log_event{};
log_event.timestamp = timestamp;
log_event.channellen = channel.size();
log_event.channel = static_cast<char*>(malloc(sizeof(char) * (channel.size() + 1)));
DRAKE_DEMAND(log_event.channel != nullptr);
strcpy(log_event.channel, channel.c_str());
log_event.datalen = data_size;
log_event.data = malloc(data_size);
DRAKE_DEMAND(log_event.data != nullptr);
memcpy(log_event.data, data, data_size);
event_buf_.push_back(log_event);
}

void WriteLog(const std::string& filename) {
lcm_eventlog_t* log = ::lcm_eventlog_create(filename.c_str(), "w");
if (log == nullptr) {
throw std::logic_error("Couldn't create lcm log " + filename);
}
for (::lcm_eventlog_event_t& event: event_buf_) {
int status = ::lcm_eventlog_write_event(log, &event);
if (status != 0) {
throw std::logic_error("Message write failure");
}
}
::lcm_eventlog_destroy(log);
}

void FreeBuf() {
for (auto& e: event_buf_) {
if (e.channel != nullptr) {
free(e.channel);
e.channel = nullptr;
}
if (e.data != nullptr) {
free(e.data);
e.data = nullptr;
}
}
event_buf_.clear();
}

~Impl() { FreeBuf(); }
};

LcmLogSink::LcmLogSink(bool overwrite_publish_time_with_system_clock):
overwrite_publish_time_with_system_clock_(
overwrite_publish_time_with_system_clock),
url_("lcmlogsink//:logsink"),
impl_(new Impl) {}

LcmLogSink::~LcmLogSink() = default;

std::string LcmLogSink::get_lcm_url() const {
return url_;
}

void LcmLogSink::Publish(const std::string& channel, const void* data,
int data_size, std::optional<double> time_sec) {
unsigned long timestamp;
if (!overwrite_publish_time_with_system_clock_) {
timestamp = second_to_timestamp(time_sec.value_or(0.0));
} else {
timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
impl_->Append(channel, data, data_size, timestamp);
}

void LcmLogSink::WriteLog(const std::string &fname) {
impl_->WriteLog(fname);
}
void LcmLogSink::clear() {
impl_->FreeBuf();
}

std::shared_ptr<DrakeSubscriptionInterface> LcmLogSink::Subscribe(
const std::string& channel, HandlerFunction handler) {
throw std::logic_error("You are trying to subscribe to an LCM channel using"
" LcmLogSink, which is a write only implementation of "
"DrakeLcmInterface.");
return nullptr;
}

std::shared_ptr<DrakeSubscriptionInterface> LcmLogSink::SubscribeMultichannel(
std::string_view /* regex */, MultichannelHandlerFunction /* handler */) {
throw std::logic_error("You are trying to subscribe to an LCM channel using"
" LcmLogSink, which is a write only implementation of "
"DrakeLcmInterface.");
return nullptr;
}

std::shared_ptr<DrakeSubscriptionInterface> LcmLogSink::SubscribeAllChannels(
MultichannelHandlerFunction handler) {
throw std::logic_error("You are trying to subscribe to an LCM channel using"
" LcmLogSink, which is a write only implementation of "
"DrakeLcmInterface.");
return nullptr;
}

int LcmLogSink::HandleSubscriptions(int) {
throw std::logic_error("You are trying to subscribe to an LCM channel using"
" LcmLogSink, which is a write only implementation of "
"DrakeLcmInterface.");
return 0;
}

void LcmLogSink::OnHandleSubscriptionsError(const std::string& error_message) {
// We are not called via LCM C code, so it's safe to throw there.
throw std::runtime_error(error_message);
}

} // namespace lcm
} // namespace drake
124 changes: 124 additions & 0 deletions lcm/lcm_log_sink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#pragma once

#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <string_view>

#include "drake/common/drake_copyable.h"
#include "drake/lcm/drake_lcm_interface.h"

namespace dairlib {
namespace lcm {

/**
* A LCM interface for logging LCM messages to a file. Contains an internal
* buffer used to store lcm_eventlog events until the user calls WriteLog to
* write an lcm log to disk. Reimplements `DrakeLcmLog` as a write only
* interface and avoids writing to disk on every call to Publish().
*/
class LcmLogSink : public drake::lcm::DrakeLcmInterface {
public:
DRAKE_NO_COPY_NO_MOVE_NO_ASSIGN(LcmLogSink);

/**
* Constructs an LcmLogSink.
* @param overwrite_publish_time_with_system_clock If true, override the
* `second` parameter passed to Publish method, and use host system's clock
* to generate the timestamp for the logged message. This is used to mimic
* lcm-logger's behavior. It also implicitly records how fast the messages
* are generated in real time.
*
*/
LcmLogSink(bool overwrite_publish_time_with_system_clock = false);

~LcmLogSink() override;

/**
* Writes an entry that occurred at @p timestamp with content @p data to the
* internal event log buffer.
* @param channel Channel name.
* @param data Pointer to raw bytes.
* @param data_size Number of bytes in @p data.
* @param time_sec Time in seconds when the message is published. Since
* messages are save to the log file in the order of Publish calls, this
* function should only be called with non-decreasing @p second. Note that
* this parameter can be overwritten by the host system's clock if
* `overwrite_publish_time_with_system_clock` is true at construction time.
*
* @throws std::exception if a new event cannot be allocated to insert into
* the buffer
*/
void Publish(const std::string& channel, const void* data, int data_size,
std::optional<double> time_sec) override;

/**
* Write the contents of this LcmLogSink to disk as an lcm log.
* @param fname the filename of the lcmlog to write.
*/
void WriteLog(const std::string& fname);

/**
* Erase the contents of this LcmLogSink
*/
void clear();

/**
* Throws an exception because LcmLogSink is write-only
*/
std::shared_ptr<drake::lcm::DrakeSubscriptionInterface> Subscribe(
const std::string& channel, HandlerFunction handler) override;

/** Throws an exception because LcmLogSink is write-only */
std::shared_ptr<drake::lcm::DrakeSubscriptionInterface> SubscribeMultichannel(
std::string_view regex, MultichannelHandlerFunction) override;

/**
* Throws an exception because LcmLogSink is write-only.
*/
std::shared_ptr<drake::lcm::DrakeSubscriptionInterface> SubscribeAllChannels(
MultichannelHandlerFunction) override;

/**
* Throws an exception because LcmLogSink is write-only
*/
int HandleSubscriptions(int) override;


/**
* Converts @p timestamp (in microseconds) to time (in seconds) relative to
* the starting time passed to the constructor.
*/
double timestamp_to_second(uint64_t timestamp) const {
return static_cast<double>(timestamp) / 1e6;
}

/**
* Converts time (in seconds) relative to the starting time passed to the
* constructor to a timestamp in microseconds.
*/
uint64_t second_to_timestamp(double sec) const {
return static_cast<uint64_t>(sec * 1e6);
}

std::string get_lcm_url() const override;

private:
void OnHandleSubscriptionsError(const std::string&) override;

const bool overwrite_publish_time_with_system_clock_;
const std::string url_;

// TODO(jwnimmer-tri) It is not clear to me why this class needs a mutex
// (i.e., where multiple threads are coming from). That factor needs to be
// re-discovered and then documented somewhere.

// This mutex guards access to the Impl object.
mutable std::mutex mutex_;
class Impl;
const std::unique_ptr<Impl> impl_;
};

} // namespace lcm
} // namespace drake