Skip to content

Commit

Permalink
feat(clp-s): Record log-order at compression time. (y-scope#584)
Browse files Browse the repository at this point in the history
Co-authored-by: wraymo <[email protected]>
  • Loading branch information
gibber9809 and wraymo authored Nov 19, 2024
1 parent d969aaf commit 12a5f8d
Show file tree
Hide file tree
Showing 26 changed files with 377 additions and 91 deletions.
9 changes: 9 additions & 0 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_log_event_idx_column_id = m_schema_tree->get_metadata_field_id(constants::cLogEventIdxName);

m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
}
Expand Down Expand Up @@ -310,6 +312,12 @@ void ArchiveReader::initialize_schema_reader(
}
BaseColumnReader* column_reader = append_reader_column(reader, column_id);

if (column_id == m_log_event_idx_column_id
&& nullptr != dynamic_cast<Int64ColumnReader*>(column_reader))
{
reader.mark_column_as_log_event_idx(static_cast<Int64ColumnReader*>(column_reader));
}

if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
{
reader.mark_column_as_timestamp(column_reader);
Expand Down Expand Up @@ -346,6 +354,7 @@ void ArchiveReader::close() {
m_cur_stream_id = 0;
m_stream_buffer.reset();
m_stream_buffer_size = 0ULL;
m_log_event_idx_column_id = -1;
}

std::shared_ptr<char[]> ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) {
Expand Down
6 changes: 6 additions & 0 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ class ArchiveReader {
m_projection = projection;
}

/**
* @return true if this archive has log ordering information, and false otherwise.
*/
bool has_log_order() { return m_log_event_idx_column_id >= 0; }

private:
/**
* Initializes a schema reader passed by reference to become a reader for a given schema.
Expand Down Expand Up @@ -214,6 +219,7 @@ class ArchiveReader {
std::shared_ptr<char[]> m_stream_buffer{};
size_t m_stream_buffer_size{0ULL};
size_t m_cur_stream_id{0ULL};
int32_t m_log_event_idx_column_id{-1};
};
} // namespace clp_s

Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void ArchiveWriter::close() {
m_encoded_message_size = 0UL;
m_uncompressed_size = 0UL;
m_compressed_size = 0UL;
m_next_log_event_id = 0;
}

void ArchiveWriter::append_message(
Expand All @@ -86,6 +87,7 @@ void ArchiveWriter::append_message(
}

m_encoded_message_size += schema_writer->append_message(message);
++m_next_log_event_id;
}

size_t ArchiveWriter::get_data_size() {
Expand Down
9 changes: 8 additions & 1 deletion components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_ARCHIVEWRITER_HPP
#define CLP_S_ARCHIVEWRITER_HPP

#include <string_view>
#include <utility>

#include <boost/filesystem.hpp>
Expand Down Expand Up @@ -93,10 +94,15 @@ class ArchiveWriter {
* @param key
* @return the node id
*/
int32_t add_node(int parent_node_id, NodeType type, std::string const& key) {
int32_t add_node(int parent_node_id, NodeType type, std::string_view const key) {
return m_schema_tree.add_node(parent_node_id, type, key);
}

/**
* @return The Id that will be assigned to the next log event when appended to the archive.
*/
int64_t get_next_log_event_id() const { return m_next_log_event_id; }

/**
* Return a schema's Id and add the schema to the
* schema map if it does not already exist.
Expand Down Expand Up @@ -174,6 +180,7 @@ class ArchiveWriter {
size_t m_encoded_message_size{};
size_t m_uncompressed_size{};
size_t m_compressed_size{};
int64_t m_next_log_event_id{};

std::string m_id;

Expand Down
8 changes: 6 additions & 2 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"structurize-arrays",
po::bool_switch(&m_structurize_arrays),
"Structurize arrays instead of compressing them as clp strings."
)(
"disable-log-order",
po::bool_switch(&m_disable_log_order),
"Do not record log order at ingestion time."
);
// clang-format on

Expand Down Expand Up @@ -296,13 +300,13 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
decompression_options.add_options()(
"ordered",
po::bool_switch(&m_ordered_decompression),
"Enable decompression in ascending timestamp order for this archive"
"Enable decompression in log order for this archive"
)(
"ordered-chunk-size",
po::value<size_t>(&m_ordered_chunk_size)
->default_value(m_ordered_chunk_size),
"Number of records to include in each output file when decompressing records "
"in ascending timestamp order"
"in log order"
);
// clang-format on
extraction_options.add(decompression_options);
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class CommandLineArguments {

std::vector<std::string> const& get_projection_columns() const { return m_projection_columns; }

bool get_record_log_order() const { return false == m_disable_log_order; }

private:
// Methods
/**
Expand Down Expand Up @@ -178,6 +180,7 @@ class CommandLineArguments {
bool m_ordered_decompression{false};
size_t m_ordered_chunk_size{0};
size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB
bool m_disable_log_order{false};

// Metadata db variables
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;
Expand Down
28 changes: 18 additions & 10 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ void JsonConstructor::store() {
m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_option.archives_dir, m_option.archive_id);
m_archive_reader->read_dictionaries_and_metadata();
if (false == m_option.ordered) {

if (m_option.ordered && false == m_archive_reader->has_log_order()) {
SPDLOG_WARN("This archive is missing ordering information and can not be decompressed in "
"log order. Falling back to out of order decompression.");
}

if (false == m_option.ordered || false == m_archive_reader->has_log_order()) {
FileWriter writer;
writer.open(
m_option.output_dir + "/original",
Expand All @@ -68,15 +74,15 @@ void JsonConstructor::construct_in_order() {
auto tables = m_archive_reader->read_all_tables();
using ReaderPointer = std::shared_ptr<SchemaReader>;
auto cmp = [](ReaderPointer& left, ReaderPointer& right) {
return left->get_next_timestamp() > right->get_next_timestamp();
return left->get_next_log_event_idx() > right->get_next_log_event_idx();
};
std::priority_queue record_queue(tables.begin(), tables.end(), cmp);
// Clear tables vector so that memory gets deallocated after we have marshalled all records for
// a given table
tables.clear();

epochtime_t first_timestamp{0};
epochtime_t last_timestamp{0};
int64_t first_idx{0};
int64_t last_idx{0};
size_t num_records_marshalled{0};
auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id;
FileWriter writer;
Expand All @@ -97,9 +103,11 @@ void JsonConstructor::construct_in_order() {

std::vector<bsoncxx::document::value> results;
auto finalize_chunk = [&](bool open_new_writer) {
// Add one to last_idx to match clp's behaviour of having the end index be exclusive
++last_idx;
writer.close();
std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_"
+ std::to_string(last_timestamp) + ".jsonl";
std::string new_file_name = src_path.string() + "_" + std::to_string(first_idx) + "_"
+ std::to_string(last_idx) + ".jsonl";
auto new_file_path = std::filesystem::path(new_file_name);
std::error_code ec;
std::filesystem::rename(src_path, new_file_path, ec);
Expand All @@ -119,11 +127,11 @@ void JsonConstructor::construct_in_order() {
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cBeginMsgIx,
static_cast<int64_t>(first_timestamp)
first_idx
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cEndMsgIx,
static_cast<int64_t>(last_timestamp)
last_idx
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cIsLastIrChunk,
Expand All @@ -140,9 +148,9 @@ void JsonConstructor::construct_in_order() {
while (false == record_queue.empty()) {
ReaderPointer next = record_queue.top();
record_queue.pop();
last_timestamp = next->get_next_timestamp();
last_idx = next->get_next_log_event_idx();
if (0 == num_records_marshalled) {
first_timestamp = last_timestamp;
first_idx = last_idx;
}
next->get_next_message(buffer);
if (false == next->done()) {
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class JsonConstructor {
private:
/**
* Reads all of the tables from m_archive_reader and writes all of the records
* they contain to writer in timestamp order.
* they contain to writer in log order.
*/
void construct_in_order();

Expand Down
34 changes: 32 additions & 2 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ JsonParser::JsonParser(JsonParserOption const& option)
m_target_encoded_size(option.target_encoded_size),
m_max_document_size(option.max_document_size),
m_timestamp_key(option.timestamp_key),
m_structurize_arrays(option.structurize_arrays) {
m_structurize_arrays(option.structurize_arrays),
m_record_log_order(option.record_log_order) {
if (false == FileUtils::validate_path(option.file_paths)) {
exit(1);
}
Expand Down Expand Up @@ -447,6 +448,16 @@ bool JsonParser::parse() {
m_num_messages = 0;
size_t bytes_consumed_up_to_prev_archive = 0;
size_t bytes_consumed_up_to_prev_record = 0;

int32_t log_event_idx_node_id{};
auto add_log_event_idx_node = [&]() {
if (m_record_log_order) {
log_event_idx_node_id
= add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
}
};
add_log_event_idx_node();

while (json_file_iterator.get_json(json_it)) {
m_current_schema.clear();

Expand All @@ -467,11 +478,20 @@ bool JsonParser::parse() {
return false;
}

// Add log_event_idx field to metadata for record
if (m_record_log_order) {
m_current_parsed_message.add_value(
log_event_idx_node_id,
m_archive_writer->get_next_log_event_id()
);
m_current_schema.insert_ordered(log_event_idx_node_id);
}

// Some errors from simdjson are latent until trying to access invalid JSON fields.
// Instead of checking for an error every time we access a JSON field in parse_line we
// just catch simdjson_error here instead.
try {
parse_line(ref.value(), -1, "");
parse_line(ref.value(), constants::cRootNodeId, constants::cRootNodeName);
} catch (simdjson::simdjson_error& error) {
SPDLOG_ERROR(
"Encountered error - {} - while trying to parse {} after parsing {} bytes",
Expand All @@ -496,6 +516,7 @@ bool JsonParser::parse() {
);
bytes_consumed_up_to_prev_archive = bytes_consumed_up_to_prev_record;
split_archive();
add_log_event_idx_node();
}

m_current_parsed_message.clear();
Expand Down Expand Up @@ -526,6 +547,15 @@ bool JsonParser::parse() {
return true;
}

int32_t JsonParser::add_metadata_field(std::string_view const field_name, NodeType type) {
auto metadata_subtree_id = m_archive_writer->add_node(
constants::cRootNodeId,
NodeType::Metadata,
constants::cMetadataSubtreeName
);
return m_archive_writer->add_node(metadata_subtree_id, type, field_name);
}

void JsonParser::store() {
m_archive_writer->close();
}
Expand Down
23 changes: 17 additions & 6 deletions components/core/src/clp_s/JsonParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <map>
#include <string>
#include <string_view>
#include <variant>
#include <vector>

Expand Down Expand Up @@ -30,12 +31,13 @@ struct JsonParserOption {
std::vector<std::string> file_paths;
std::string timestamp_key;
std::string archives_dir;
size_t target_encoded_size;
size_t max_document_size;
size_t min_table_size;
int compression_level;
bool print_archive_stats;
bool structurize_arrays;
size_t target_encoded_size{};
size_t max_document_size{};
size_t min_table_size{};
int compression_level{};
bool print_archive_stats{};
bool structurize_arrays{};
bool record_log_order{true};
std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db;
};

Expand Down Expand Up @@ -94,6 +96,14 @@ class JsonParser {
*/
void split_archive();

/**
* Adds an internal field to the MPT and get its Id.
*
* Note: this method should be called before parsing a record so that internal fields come first
* in each table. This isn't strictly necessary, but it is a nice convention.
*/
int32_t add_metadata_field(std::string_view const field_name, NodeType type);

int m_num_messages;
std::vector<std::string> m_file_paths;

Expand All @@ -109,6 +119,7 @@ class JsonParser {
size_t m_target_encoded_size;
size_t m_max_document_size;
bool m_structurize_arrays{false};
bool m_record_log_order{true};
};
} // namespace clp_s

Expand Down
7 changes: 4 additions & 3 deletions components/core/src/clp_s/JsonSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define CLP_S_JSONSERIALIZER_HPP

#include <string>
#include <string_view>
#include <vector>

#include "ColumnReader.hpp"
Expand Down Expand Up @@ -66,7 +67,7 @@ class JsonSerializer {
return false;
}

void add_special_key(std::string const& key) { m_special_keys.push_back(key); }
void add_special_key(std::string_view const key) { m_special_keys.emplace_back(key); }

void begin_object() {
append_key();
Expand Down Expand Up @@ -110,13 +111,13 @@ class JsonSerializer {

void append_key() { append_key(m_special_keys[m_special_keys_index++]); }

void append_key(std::string const& key) {
void append_key(std::string_view const key) {
m_json_string += "\"";
m_json_string += key;
m_json_string += "\":";
}

void append_value(std::string const& value) {
void append_value(std::string_view const value) {
m_json_string += value;
m_json_string += ",";
}
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/ReaderUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ std::shared_ptr<SchemaTree> ReaderUtils::read_schema_tree(std::string const& arc
throw OperationFailed(error_code, __FILENAME__, __LINE__);
}

std::string key;
for (size_t i = 0; i < num_nodes; i++) {
int32_t parent_id;
size_t key_length;
std::string key;
uint8_t node_type;

error_code = schema_tree_decompressor.try_read_numeric_value(parent_id);
Expand Down
Loading

0 comments on commit 12a5f8d

Please sign in to comment.