Skip to content

Commit

Permalink
Merge pull request #450 from ZeroCM/logger-clarify_transcoder_shard_u…
Browse files Browse the repository at this point in the history
…sage

Logger clarify transcoder shard usage
  • Loading branch information
jbendes authored Sep 18, 2023
2 parents f095013 + 743b1fa commit be001f8
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 67 deletions.
12 changes: 6 additions & 6 deletions tools/cpp/bridge/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ struct Bridge
if (zcmA) delete zcmA;
if (zcmB) delete zcmB;
if (pluginDb) { delete pluginDb; pluginDb = nullptr; }
for (auto& p : plugins) delete p;
}

bool init(int argc, char *argv[])
Expand Down Expand Up @@ -295,15 +296,14 @@ struct Bridge
// Load plugins from path if specified
if (args.plugin_path != "") {
pluginDb = new TranscoderPluginDb(args.plugin_path, args.debug);
vector<const zcm::TranscoderPlugin*> dbPlugins = pluginDb->getPlugins();
if (dbPlugins.empty()) {
auto dbPluginsMeta = pluginDb->getPluginMeta();
if (dbPluginsMeta.empty()) {
cerr << "Couldn't find any plugins. Aborting." << endl;
return false;
}
vector<string> dbPluginNames = pluginDb->getPluginNames();
for (size_t i = 0; i < dbPlugins.size(); ++i) {
plugins.push_back((zcm::TranscoderPlugin*) dbPlugins[i]);
if (args.debug) cout << "Loaded plugin: " << dbPluginNames[i] << endl;
for (auto pmeta : dbPluginsMeta) {
plugins.push_back(pmeta.makeTranscoderPlugin());
if (args.debug) cout << "Loaded plugin: " << pmeta.className << endl;
}
}

Expand Down
83 changes: 54 additions & 29 deletions tools/cpp/logger/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>
#include <vector>
#include <signal.h>
Expand Down Expand Up @@ -301,14 +300,21 @@ struct Logger
queue<zcm::LogEvent*> q;

TranscoderPluginDb* pluginDb = nullptr;
vector<zcm::TranscoderPlugin*> plugins;

vector<vector<zcm::TranscoderPlugin*>> shard_plugins;

Logger() {}

~Logger()
{
if (pluginDb) { delete pluginDb; pluginDb = nullptr; }
if (log) { log->close(); delete log; }
for (auto& s : shard_plugins) {
for (auto& p : s) {
delete p;
}
}

if (log) { log->close(); delete log; }

while (!q.empty()) {
delete[] q.front()->data;
Expand All @@ -325,19 +331,29 @@ struct Logger
if (!openLogfile())
return false;

shard_plugins.resize(args.shards.size());

// Load plugins from path if specified
assert(pluginDb == nullptr);
if (args.plugin_path != "") {
pluginDb = new TranscoderPluginDb(args.plugin_path, args.debug);
vector<const zcm::TranscoderPlugin*> dbPlugins = pluginDb->getPlugins();
if (dbPlugins.empty()) {
auto dbPluginsMeta = pluginDb->getPluginMeta();
if (dbPluginsMeta.empty()) {
cerr << "Couldn't find any plugins. Aborting." << endl;
return false;
}
vector<string> dbPluginNames = pluginDb->getPluginNames();
for (size_t i = 0; i < dbPlugins.size(); ++i) {
plugins.push_back((zcm::TranscoderPlugin*) dbPlugins[i]);
if (args.debug) cout << "Loaded plugin: " << dbPluginNames[i] << endl;
if (shard_plugins.size() > 1) {
cerr << "You are using transcoder plugins with multiple shards." << endl
<< "Reminder that each shard will get its own clone of each" << endl
<< "transcoder plugin and each transcoder plugin will run " << endl
<< "in that shards threads. " << endl
<< "See comments in TranscoderPlugin.hpp for more information" << endl;
}
for (auto pmeta : dbPluginsMeta) {
for (size_t i = 0; i < shard_plugins.size(); ++i) {
shard_plugins[i].push_back(pmeta.makeTranscoderPlugin());
}
if (args.debug) cout << "Loaded plugin: " << pmeta.className << endl;
}
}

Expand Down Expand Up @@ -426,7 +442,8 @@ struct Logger
return true;
}

void handler(const zcm::ReceiveBuffer* rbuf, const string& channel)
void handler(const zcm::ReceiveBuffer* rbuf,
const string& channel, size_t shardNum)
{
vector<zcm::LogEvent*> evts;

Expand All @@ -435,13 +452,13 @@ struct Logger
le->channel = channel;
le->datalen = rbuf->data_size;

if (!plugins.empty()) {
if (!shard_plugins[shardNum].empty()) {
le->data = rbuf->data;

int64_t msg_hash;
__int64_t_decode_array(le->data, 0, 8, &msg_hash, 1);

for (auto& p : plugins) {
for (auto& p : shard_plugins[shardNum]) {
vector<const zcm::LogEvent*> pevts =
p->transcodeEvent((uint64_t) msg_hash, le);
for (auto* evt : pevts)
Expand All @@ -462,28 +479,29 @@ struct Logger
{
unique_lock<mutex> lock{lk};
while (!evts.empty()) {
if (stillRoom) {
zcm::LogEvent* le = evts.back();
if (!le) {
evts.pop_back();
continue;
}
q.push(le);
totalMemoryUsage += le->datalen + le->channel.size() + sizeof(*le);
stillRoom = (args.max_target_memory == 0) ? true :
(totalMemoryUsage + rbuf->data_size < args.max_target_memory);
evts.pop_back();
} else {
ZCM_DEBUG("Dropping message due to enforced memory constraints");
ZCM_DEBUG("Current memory estimations are at %" PRId64 " bytes",
totalMemoryUsage);
if (!stillRoom) {
fprintf(stderr,
"Dropping message due to enforced memory constraints");
fprintf(stderr,
"Current memory estimations are at %" PRId64 " bytes",
totalMemoryUsage);
while (!evts.empty()) {
delete[] evts.back()->data;
delete evts.back();
evts.pop_back();
}
break;
}

// `back` is okay here because all the events in `evts` are from the
// same `rbuf->recv_utime`
zcm::LogEvent* le = evts.back();
evts.pop_back();
if (!le) continue;
q.push(le);
totalMemoryUsage += le->datalen + le->channel.size() + sizeof(*le);
stillRoom = (args.max_target_memory == 0) ? true :
(totalMemoryUsage + rbuf->data_size < args.max_target_memory);
}
}
newEventCond.notify_all();
Expand Down Expand Up @@ -542,6 +560,7 @@ struct Logger
return;
}

// XXX (Bendes): asan reported unsigned integer overflow here
if (args.fflush_interval_ms >= 0 &&
(le->timestamp - last_fflush_time) > (u64)args.fflush_interval_ms * 1000) {
Platform::fflush(log->getFilePtr());
Expand Down Expand Up @@ -583,6 +602,11 @@ struct Logger

Logger logger{};

static void handler(const zcm::ReceiveBuffer* rbuf, const string& channel, void* usr)
{
logger.handler(rbuf, channel, (size_t)usr);
}

void sighandler(int signal)
{
done++;
Expand All @@ -597,7 +621,8 @@ int main(int argc, char *argv[])
if (!logger.init(argc, argv)) return 1;

vector<unique_ptr<zcm::ZCM>> zcms;
for (const auto& s : logger.args.shards) {
for (size_t i = 0; i < logger.args.shards.size(); ++i) {
const auto& s = logger.args.shards[i];
ZCM_DEBUG("Constructing shard with url: %s", s.zcmurl.c_str());
zcms.emplace_back(new zcm::ZCM(s.zcmurl));
if (!zcms.back()->good()) {
Expand All @@ -614,7 +639,7 @@ int main(int argc, char *argv[])

for (const auto& c : s.channels) {
ZCM_DEBUG("Subscribing to : %s", c.c_str());
zcms.back()->subscribe(c, &Logger::handler, &logger);
zcms.back()->subscribe(c, &handler, (void*)i);
}
}

Expand Down
12 changes: 6 additions & 6 deletions tools/cpp/transcoder/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,14 @@ int main(int argc, char* argv[])

vector<zcm::TranscoderPlugin*> plugins;
TranscoderPluginDb pluginDb(args.plugin_path, args.debug);
vector<const zcm::TranscoderPlugin*> dbPlugins = pluginDb.getPlugins();
if (dbPlugins.empty()) {
auto dbPluginsMeta = pluginDb.getPluginMeta();
if (dbPluginsMeta.empty()) {
cerr << "Couldn't find any plugins. Aborting." << endl;
return 1;
}
vector<string> dbPluginNames = pluginDb.getPluginNames();
for (size_t i = 0; i < dbPlugins.size(); ++i) {
plugins.push_back((zcm::TranscoderPlugin*) dbPlugins[i]);
if (args.debug) cout << "Loaded plugin: " << dbPluginNames[i] << endl;
for (auto pmeta : dbPluginsMeta) {
plugins.push_back(pmeta.makeTranscoderPlugin());
if (args.debug) cout << "Loaded plugin: " << pmeta.className << endl;
}

if (args.debug) return 0;
Expand Down Expand Up @@ -166,6 +165,7 @@ int main(int argc, char* argv[])

inlog.close();
outlog.close();
for (auto& p : plugins) delete p;

cout << "Transcoded " << numInEvents << " events into " << numOutEvents << " events" << endl;
return 0;
Expand Down
19 changes: 4 additions & 15 deletions tools/cpp/util/TranscoderPluginDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,13 @@ bool TranscoderPluginDb::findPlugins(const string& libname)
DEBUG("Success loading plugin %s\n", demangled.c_str());
}

for (auto& meta : pluginMeta) {
zcm::TranscoderPlugin* p = (zcm::TranscoderPlugin*) meta.makeTranscoderPlugin();
DEBUG("Added new plugin with address %p\n", p);
plugins.push_back(p);
constPlugins.push_back(plugins.back());
names.push_back(meta.className);
}

DEBUG("Loaded %d plugins from %s\n", (int)plugins.size(), libname.c_str());
DEBUG("Loaded %d plugins from %s\n", (int)pluginMeta.size(), libname.c_str());

return true;
}

std::vector<const zcm::TranscoderPlugin*> TranscoderPluginDb::getPlugins() const
{ return constPlugins; }

std::vector<string> TranscoderPluginDb::getPluginNames() const
{ return names; }
std::vector<TranscoderPluginMetadata> TranscoderPluginDb::getPluginMeta() const
{ return pluginMeta; }

TranscoderPluginDb::TranscoderPluginDb(const string& paths, bool debug) : debug(debug)
{
Expand All @@ -123,4 +112,4 @@ TranscoderPluginDb::TranscoderPluginDb(const string& paths, bool debug) : debug(
}
}

TranscoderPluginDb::~TranscoderPluginDb() { for (auto p : plugins) delete p; }
TranscoderPluginDb::~TranscoderPluginDb() {}
6 changes: 1 addition & 5 deletions tools/cpp/util/TranscoderPluginDb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@ class TranscoderPluginDb
// Note paths is a ":" delimited list of paths just like $PATH
TranscoderPluginDb(const std::string& paths = "", bool debug = false);
~TranscoderPluginDb();
std::vector<const zcm::TranscoderPlugin*> getPlugins() const;
std::vector<std::string> getPluginNames() const;
std::vector<TranscoderPluginMetadata> getPluginMeta() const;

private:
bool findPlugins(const std::string& libname);
bool debug;
std::vector<TranscoderPluginMetadata> pluginMeta;
std::vector<zcm::TranscoderPlugin*> plugins;
std::vector<std::string> names;
std::vector<const zcm::TranscoderPlugin*> constPlugins;
};
22 changes: 16 additions & 6 deletions zcm/tools/TranscoderPlugin.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <string>
#include <cstdint>
#include <string>

#include "zcm/zcm-cpp.hpp"

Expand Down Expand Up @@ -41,7 +41,7 @@ class TranscoderPlugin
{
public:
static inline std::vector<const LogEvent*> TYPE_NOT_HANDLED() { return {}; }
static inline std::vector<const LogEvent*> TYPE_NO_RECORD() {return {nullptr};};
static inline std::vector<const LogEvent*> TYPE_NO_RECORD() { return { nullptr }; };

// Must declare the following function for your plugin. Unable to enforce
// declaration of static functions in inheritance, so this API trusts you
Expand All @@ -50,7 +50,6 @@ class TranscoderPlugin

virtual ~TranscoderPlugin() {}

//
// hash is the hash of the type encoded inside the event
//
// if (hash == msg_t::getHash()) {
Expand All @@ -75,13 +74,24 @@ class TranscoderPlugin
// return { &newEvt };
//
// return a vector of events you want to transcode this event into.
// return TranscoderPlugin::transcodeEvent(hash,evt) to not transcode
// this event into the output log
// return TYPE_NO_RECORD to not transcode this event into the output log
// return TYPE_NOT_HANDLED if this transcoder should not affect this log event
//
// This function will only be called from a single thread.
//
// When used in zcm-logger, multiple instances of your transcoder may be instantiated
// (one per logging shard). If those instances need to communicate with each other,
// you'll have to manage that internally, perhaps by using a static threadsafe
// singleton storage structure.
// ie:
// static atomic_int intSingleton {0};
// class MyTranscoderPlugin : public zcm::TranscoderPlugin {
// ...
// }
virtual std::vector<const LogEvent*> transcodeEvent(int64_t hash, const LogEvent* evt)
{
return TYPE_NO_RECORD();
}
};

}
} // namespace zcm

0 comments on commit be001f8

Please sign in to comment.