Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/log reader reads all log files #164

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 80 additions & 52 deletions infrastructure/logging/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,22 @@ LogReader::LogReader(const std::string& log_path, const std::vector<std::string>
throw std::runtime_error("Couldn't open metadata file for log: " + log_path);
}

if (channel_names.empty())
{
if (channel_names.empty()) {
for (auto& channel_name : channels_in_log_) {
ChannelState channel;
const std::string file_path = generate_log_file_path(log_path, channel_name, channel.current_file_number);
if (!open_file(file_path, channel.current_file))
{
throw std::runtime_error("Couldn't open file: " + file_path);
if (!get_logfile(channel, channel_name)) {
throw std::runtime_error("Couldn't open file for " + channel_name);
}
channels_.emplace(channel_name, std::move(channel));
}
}
else {
for (auto& channel_name : channel_names) {
auto channel_it = std::find(channels_in_log_.begin(), channels_in_log_.end(), channel_name);
if (channel_it != channels_in_log_.end())
{
if (channel_it != channels_in_log_.end()) {
ChannelState channel;
const std::string file_path = generate_log_file_path(log_path, channel_name, channel.current_file_number);
if (!open_file(file_path, channel.current_file))
{
throw std::runtime_error("Couldn't open file: " + file_path);
if (!get_logfile(channel, channel_name)) {
throw std::runtime_error("Couldn't open file for " + channel_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may still be useful to include the filename in the throw exception string.

}
channels_.emplace(channel_name, std::move(channel));
}
Expand All @@ -49,69 +43,103 @@ LogReader::LogReader(const std::string& log_path, const std::vector<std::string>
}

LogReader::~LogReader() {
for (auto& channel : channels_)
{
for (auto& channel : channels_) {
channel.second.current_file.close();
}
}

bool LogReader::get_logfile(ChannelState& channel, std::string channel_name) {
if (channel.current_file.is_open()) {
channel.current_file.close();
}
const std::string file_path = generate_log_file_path(log_path_, channel_name, channel.current_file_number);
return open_file(file_path, channel.current_file);

masonturner marked this conversation as resolved.
Show resolved Hide resolved
}

bool LogReader::try_read_next_message(ChannelState& channel, Message& message) {
auto& file = channel.current_file;

uint32_t channel_id;
uint32_t message_length;
file.read((char*) &channel_id, sizeof(uint32_t));
file.read((char*) &message_length, sizeof(uint32_t));

if (!file.good()) {
return false;
}

std::string message_data(message_length, ' ');
file.read(&message_data[0], message_length);
if (!file.good()) {
return false;
}
message.deserialize(message_data);
return true;
}

bool LogReader::read_next_message(const std::string& channel_name, Message& message) {
auto channel_it = channels_.find(channel_name);
if (channel_it != channels_.end()) {
auto& file = channel_it->second.current_file;
if (channel_it == channels_.end()) {
return false;
}
// Try to read a message from the current log file
if (try_read_next_message(channel_it->second, message)) {
return true;
}
// Otherwise get the next logfile if it exists
channel_it->second.current_file_number++;
if (!get_logfile(channel_it->second, channel_name)) {
return false;
}
// If we can't read from next logfile then give up
return try_read_next_message(channel_it->second, message);
}

uint32_t channel_id;
uint32_t message_length;
file.read((char*) &channel_id, sizeof(uint32_t));
file.read((char*) &message_length, sizeof(uint32_t));
bool LogReader::try_read_next_message_raw(ChannelState& channel, std::string& message_data) {
auto& file = channel.current_file;
uint32_t channel_id;
uint32_t message_length;
file.read((char*) &channel_id, sizeof(uint32_t));
file.read((char*) &message_length, sizeof(uint32_t));

if (file.eof()) {
return false;
}
std::string message_data(message_length, ' ');
file.read(&message_data[0], message_length);
if (!file) {
// TODO: Check to see if there's another log file after this one before assuming we've reached the end of the log.
// std::cerr << "Failed to read from file for channel: " << channel_name << ". Reached end of file." << std::endl;
return false;
}
message.deserialize(message_data);
return true;
if (!file.good()) {
return false;
}
return false;

message_data.resize(message_length, ' ');
file.read(&message_data[0], message_length);

return file.good();
}

bool LogReader::read_next_message_raw(const std::string& channel_name, std::string& message_data) {
auto channel_it = channels_.find(channel_name);
if (channel_it != channels_.end()) {
auto& file = channel_it->second.current_file;
uint32_t channel_id;
uint32_t message_length;
file.read((char*) &channel_id, sizeof(uint32_t));
file.read((char*) &message_length, sizeof(uint32_t));
message_data.resize(message_length, ' ');
file.read(&message_data[0], message_length);
if (!file) {
// TODO: Check to see if there's another log file after this one before assuming we've reached the end of the log.
// std::cerr << "Failed to read from file for channel: " << channel_name << ". Reached end of file." << std::endl;
return false;
}
if (channel_it == channels_.end()) {
return false;
}
// Try to read a message from the current log file
if (try_read_next_message_raw(channel_it->second, message_data)) {
return true;
}
return false;
// Otherwise get the next log file if it exists
channel_it->second.current_file_number++;
if (!get_logfile(channel_it->second, channel_name)) {
return false;
}
// If we can't read from the next logfile, then give up
return try_read_next_message_raw(channel_it->second, message_data);
}

bool LogReader::read_metadata(const std::string& log_path, std::vector<std::string>& channel_names) {
const std::string metadata_path = log_path + "/metadata.txt";
std::ifstream metadata_file;
if (!open_file(metadata_path, metadata_file))
{
if (!open_file(metadata_path, metadata_file)) {
return false;
}

std::string line;
while (std::getline(metadata_file, line))
{
while (std::getline(metadata_file, line)) {
channels_in_log_.emplace_back(line);
}
metadata_file.close();
Expand All @@ -125,7 +153,7 @@ bool LogReader::open_file(const std::string& file_path, std::ifstream& file) {
std::cerr << "Could not open file " << file_path << " because " << e.what() << std::endl;
return false;
}
return true;
return file.good();
}

} // namespace jet
4 changes: 3 additions & 1 deletion infrastructure/logging/log_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ class LogReader {

private:
bool open_file(const std::string& file_path, std::ifstream& file);
bool try_read_next_message(ChannelState& channel, Message& message);
bool try_read_next_message_raw(ChannelState& channel, std::string& message_data);
bool get_logfile(ChannelState& channel, std::string channel_name);
bool read_metadata(const std::string& log_path, std::vector<std::string>& channel_names);

std::string log_path_;
std::unordered_map<std::string, ChannelState> channels_;
std::vector<std::string> channels_in_log_;
Expand Down