diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 4baaefcee..dbaf91dba 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -51,7 +51,8 @@ void ArchiveReader::open(ArchiveReaderOption& option) { m_var_dict, m_log_dict, m_array_dict, - m_timestamp_dict + m_timestamp_dict, + false ); schema_reader->load(); diff --git a/components/core/src/clp_s/ColumnReader.hpp b/components/core/src/clp_s/ColumnReader.hpp index 0b3d86a65..98d24f728 100644 --- a/components/core/src/clp_s/ColumnReader.hpp +++ b/components/core/src/clp_s/ColumnReader.hpp @@ -5,6 +5,7 @@ #include #include "DictionaryReader.hpp" +#include "SchemaTree.hpp" #include "TimestampDictionaryReader.hpp" #include "Utils.hpp" #include "ZstdDecompressor.hpp" @@ -36,7 +37,7 @@ class BaseColumnReader { int32_t get_id() const { return m_id; } - virtual std::string get_type() { return "base"; } + virtual NodeType get_type() { return NodeType::UNKNOWN; } /** * Extracts a value of the column @@ -63,7 +64,7 @@ class Int64ColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; - std::string get_type() override { return "int"; } + NodeType get_type() override { return NodeType::INTEGER; } std::variant extract_value(uint64_t cur_message ) override; @@ -84,7 +85,7 @@ class FloatColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; - std::string get_type() override { return "float"; } + NodeType get_type() override { return NodeType::FLOAT; } std::variant extract_value(uint64_t cur_message ) override; @@ -105,7 +106,7 @@ class BooleanColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; - std::string get_type() override { return "bool"; } + NodeType get_type() override { return NodeType::BOOLEAN; } std::variant extract_value(uint64_t cur_message ) override; @@ -135,7 +136,7 @@ class ClpStringColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; - std::string get_type() override { return m_is_array ? "array" : "string"; } + NodeType get_type() override { return m_is_array ? NodeType::ARRAY : NodeType::CLPSTRING; } std::variant extract_value(uint64_t cur_message ) override; @@ -182,7 +183,7 @@ class VariableStringColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; - std::string get_type() override { return "string"; } + NodeType get_type() override { return NodeType::VARSTRING; } std::variant extract_value(uint64_t cur_message ) override; @@ -217,7 +218,7 @@ class DateStringColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; - std::string get_type() override { return "string"; } + NodeType get_type() override { return NodeType::DATESTRING; } std::variant extract_value(uint64_t cur_message ) override; @@ -246,7 +247,7 @@ class FloatDateStringColumnReader : public BaseColumnReader { // Methods inherited from BaseColumnReader void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; - std::string get_type() override { return "string"; } + NodeType get_type() override { return NodeType::FLOATDATESTRING; } std::variant extract_value(uint64_t cur_message ) override; diff --git a/components/core/src/clp_s/ColumnWriter.hpp b/components/core/src/clp_s/ColumnWriter.hpp index ebdf3dca0..ae7c9b3ba 100644 --- a/components/core/src/clp_s/ColumnWriter.hpp +++ b/components/core/src/clp_s/ColumnWriter.hpp @@ -220,6 +220,7 @@ class FloatDateStringColumnWriter : public BaseColumnWriter { std::shared_ptr timestamp_dict ) : BaseColumnWriter(name, id), + m_timestamp_dict(std::move(timestamp_dict)) {} // Destructor diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index e33033578..aa19b275a 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -355,6 +355,11 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::value(&m_batch_size)->value_name("BATCH_SIZE")-> default_value(m_batch_size), "The number of documents to insert into MongoDB in a batch" + )( + "max-num-results", + po::value(&m_max_num_results)->value_name("MAX_NUM_RESULTS")-> + default_value(m_max_num_results), + "The maximum number of results to output" ); // clang-format on search_options.add(output_options); @@ -421,9 +426,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "MongoDB uri and collection must be specified together" ); } - if (m_batch_size == 0) { + if (0 == m_batch_size) { throw std::invalid_argument("Batch size must be greater than 0"); } + if (0 == m_max_num_results) { + throw std::invalid_argument("Max number of results must be greater than 0"); + } m_mongodb_enabled = true; } diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 8c43aaaa5..c898f4781 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -56,6 +56,8 @@ class CommandLineArguments { uint64_t get_batch_size() const { return m_batch_size; } + uint64_t get_max_num_results() const { return m_max_num_results; } + std::string const& get_query() const { return m_query; } std::optional get_search_begin_ts() const { return m_search_begin_ts; } @@ -97,6 +99,7 @@ class CommandLineArguments { std::string m_mongodb_uri; std::string m_mongodb_collection; uint64_t m_batch_size{1000}; + uint64_t m_max_num_results{1000}; // Search variables std::string m_query; diff --git a/components/core/src/clp_s/ReaderUtils.cpp b/components/core/src/clp_s/ReaderUtils.cpp index 2f21cb195..0a35e3cfc 100644 --- a/components/core/src/clp_s/ReaderUtils.cpp +++ b/components/core/src/clp_s/ReaderUtils.cpp @@ -183,6 +183,58 @@ std::vector ReaderUtils::get_schemas(std::string const& archive_path) { return schemas; } +BaseColumnReader* ReaderUtils::append_reader_column( + SchemaReader* reader, + int32_t column_id, + std::shared_ptr const& schema_tree, + std::shared_ptr const& var_dict, + std::shared_ptr const& log_dict, + std::shared_ptr const& array_dict, + std::shared_ptr const& timestamp_dict +) { + BaseColumnReader* column_reader = nullptr; + auto node = schema_tree->get_node(column_id); + std::string key_name = node->get_key_name(); + switch (node->get_type()) { + case NodeType::INTEGER: + column_reader = new Int64ColumnReader(key_name, column_id); + break; + case NodeType::FLOAT: + column_reader = new FloatColumnReader(key_name, column_id); + break; + case NodeType::CLPSTRING: + column_reader = new ClpStringColumnReader(key_name, column_id, var_dict, log_dict); + break; + case NodeType::VARSTRING: + column_reader = new VariableStringColumnReader(key_name, column_id, var_dict); + break; + case NodeType::BOOLEAN: + reader->append_column(new BooleanColumnReader(key_name, column_id)); + break; + case NodeType::ARRAY: + column_reader + = new ClpStringColumnReader(key_name, column_id, var_dict, array_dict, true); + break; + case NodeType::DATESTRING: + column_reader = new DateStringColumnReader(key_name, column_id, timestamp_dict); + break; + case NodeType::FLOATDATESTRING: + column_reader = new FloatDateStringColumnReader(key_name, column_id); + break; + case NodeType::OBJECT: + case NodeType::NULLVALUE: + reader->append_column(column_id); + break; + case NodeType::UNKNOWN: + break; + } + + if (column_reader) { + reader->append_column(column_reader); + } + return column_reader; +} + void ReaderUtils::append_reader_columns( SchemaReader* reader, Schema const& columns, @@ -190,44 +242,24 @@ void ReaderUtils::append_reader_columns( std::shared_ptr const& var_dict, std::shared_ptr const& log_dict, std::shared_ptr const& array_dict, - std::shared_ptr const& timestamp_dict + std::shared_ptr const& timestamp_dict, + bool extract_timestamp ) { - for (int32_t column : columns) { - auto node = schema_tree->get_node(column); - std::string key_name = node->get_key_name(); - switch (node->get_type()) { - case NodeType::INTEGER: - reader->append_column(new Int64ColumnReader(key_name, column)); - break; - case NodeType::FLOAT: - reader->append_column(new FloatColumnReader(key_name, column)); - break; - case NodeType::CLPSTRING: - reader->append_column( - new ClpStringColumnReader(key_name, column, var_dict, log_dict) - ); - break; - case NodeType::VARSTRING: - reader->append_column(new VariableStringColumnReader(key_name, column, var_dict)); - break; - case NodeType::BOOLEAN: - reader->append_column(new BooleanColumnReader(key_name, column)); - break; - case NodeType::ARRAY: - reader->append_column( - new ClpStringColumnReader(key_name, column, var_dict, array_dict, true) - ); - break; - case NodeType::DATESTRING: - reader->append_column(new DateStringColumnReader(key_name, column, timestamp_dict)); - break; - case NodeType::FLOATDATESTRING: - reader->append_column(new FloatDateStringColumnReader(key_name, column)); - break; - case NodeType::OBJECT: - case NodeType::NULLVALUE: - reader->append_column(column); - break; + auto timestamp_column_ids = timestamp_dict->get_authoritative_timestamp_column_ids(); + + for (int32_t column_id : columns) { + BaseColumnReader* column_reader = append_reader_column( + reader, + column_id, + schema_tree, + var_dict, + log_dict, + array_dict, + timestamp_dict + ); + + if (extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0) { + reader->mark_column_as_timestamp(column_reader); } } } diff --git a/components/core/src/clp_s/ReaderUtils.hpp b/components/core/src/clp_s/ReaderUtils.hpp index ae1d1515d..03f128642 100644 --- a/components/core/src/clp_s/ReaderUtils.hpp +++ b/components/core/src/clp_s/ReaderUtils.hpp @@ -103,6 +103,7 @@ class ReaderUtils { * @param log_dict * @param array_dict * @param timestamp_dict + * @param extract_timestamp */ static void append_reader_columns( SchemaReader* reader, @@ -111,6 +112,29 @@ class ReaderUtils { std::shared_ptr const& var_dict, std::shared_ptr const& log_dict, std::shared_ptr const& array_dict, + std::shared_ptr const& timestamp_dict, + bool extract_timestamp + ); + +private: + /** + * Appends a column to the given schema reader + * @param reader + * @param column_id + * @param schema_tree + * @param var_dict + * @param log_dict + * @param array_dict + * @param timestamp_dict + * @return the appended reader column + */ + static BaseColumnReader* append_reader_column( + SchemaReader* reader, + int32_t column_id, + std::shared_ptr const& schema_tree, + std::shared_ptr const& var_dict, + std::shared_ptr const& log_dict, + std::shared_ptr const& array_dict, std::shared_ptr const& timestamp_dict ); }; diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index b55f6feae..43bcf3042 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -10,9 +10,6 @@ void SchemaReader::close() { for (auto& i : m_columns) { delete i; } - - m_column_map.clear(); - m_global_id_to_local_id.clear(); } void SchemaReader::append_column(BaseColumnReader* column_reader) { @@ -21,6 +18,32 @@ void SchemaReader::append_column(BaseColumnReader* column_reader) { generate_local_tree(column_reader->get_id()); } +void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) { + m_timestamp_column = column_reader; + if (m_timestamp_column->get_type() == NodeType::DATESTRING) { + m_get_timestamp = [this]() { + return static_cast(m_timestamp_column) + ->get_encoded_time(m_cur_message); + }; + } else if (m_timestamp_column->get_type() == NodeType::FLOATDATESTRING) { + m_get_timestamp = [this]() { + double timestamp = static_cast(m_timestamp_column) + ->get_encoded_time(m_cur_message); + return static_cast(timestamp); + }; + } else if (m_timestamp_column->get_type() == NodeType::INTEGER) { + m_get_timestamp = [this]() { + return std::get(m_extracted_values[m_timestamp_column->get_id()]); + }; + } else if (m_timestamp_column->get_type() == NodeType::FLOAT) { + m_get_timestamp = [this]() { + return static_cast( + std::get(m_extracted_values[m_timestamp_column->get_id()]) + ); + }; + } +} + void SchemaReader::append_column(int32_t id) { generate_local_tree(id); } @@ -44,11 +67,7 @@ void SchemaReader::load() { generate_json_template(0); } -bool SchemaReader::get_next_message(std::string& message) { - if (m_cur_message >= m_num_messages) { - return false; - } - +void SchemaReader::generate_json_string() { m_json_serializer->reset(); m_json_serializer->begin_document(); size_t column_id_index = 0; @@ -114,6 +133,14 @@ bool SchemaReader::get_next_message(std::string& message) { } m_json_serializer->end_document(); +} + +bool SchemaReader::get_next_message(std::string& message) { + if (m_cur_message >= m_num_messages) { + return false; + } + + generate_json_string(); message = m_json_serializer->get_serialized_string(); @@ -132,78 +159,42 @@ bool SchemaReader::get_next_message(std::string& message, FilterClass* filter) { continue; } - m_json_serializer->reset(); - m_json_serializer->begin_document(); - size_t column_id_index = 0; - BaseColumnReader* column; - JsonSerializer::Op op; - while (m_json_serializer->get_next_op(op)) { - switch (op) { - case JsonSerializer::Op::BeginObject: { - m_json_serializer->begin_object(); - break; - } - case JsonSerializer::Op::EndObject: { - m_json_serializer->end_object(); - break; - } - case JsonSerializer::Op::AddIntField: { - column = m_reordered_columns[column_id_index++]; - m_json_serializer->append_key(column->get_name()); - m_json_serializer->append_value( - std::to_string(std::get(m_extracted_values[column->get_id()])) - ); - break; - } - case JsonSerializer::Op::AddFloatField: { - column = m_reordered_columns[column_id_index++]; - m_json_serializer->append_key(column->get_name()); - m_json_serializer->append_value( - std::to_string(std::get(m_extracted_values[column->get_id()])) - ); - break; - } - case JsonSerializer::Op::AddBoolField: { - column = m_reordered_columns[column_id_index++]; - m_json_serializer->append_key(column->get_name()); - m_json_serializer->append_value( - std::get(m_extracted_values[column->get_id()]) != 0 ? "true" - : "false" - ); - break; - } - case JsonSerializer::Op::AddStringField: { - column = m_reordered_columns[column_id_index++]; - m_json_serializer->append_key(column->get_name()); - m_json_serializer->append_value_with_quotes( - std::get(m_extracted_values[column->get_id()]) - ); - break; - } - case JsonSerializer::Op::AddArrayField: { - column = m_reordered_columns[column_id_index++]; - m_json_serializer->append_key(column->get_name()); - m_json_serializer->append_value( - std::get(m_extracted_values[column->get_id()]) - ); - break; - } - case JsonSerializer::Op::AddNullField: { - m_json_serializer->append_key(); - m_json_serializer->append_value("null"); - break; - } - } + generate_json_string(); + message = m_json_serializer->get_serialized_string(); + + if (message.back() != '\n') { + message += '\n'; } - m_json_serializer->end_document(); + m_cur_message++; + return true; + } + return false; +} + +bool SchemaReader::get_next_message_with_timestamp( + std::string& message, + epochtime_t& timestamp, + FilterClass* filter +) { + // TODO: If we already get max_num_results messages, we can skip messages + // with the timestamp less than the smallest timestamp in the priority queue + while (m_cur_message < m_num_messages) { + if (false == filter->filter(m_cur_message, m_extracted_values)) { + m_cur_message++; + continue; + } + + generate_json_string(); message = m_json_serializer->get_serialized_string(); if (message.back() != '\n') { message += '\n'; } + timestamp = m_get_timestamp(); + m_cur_message++; return true; } diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index da1ace3c0..d1f564a9e 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -47,6 +47,7 @@ class SchemaReader { explicit SchemaReader(std::shared_ptr schema_tree, int32_t schema_id) : m_num_messages(0), m_cur_message(0), + m_get_timestamp([]() -> epochtime_t { return 0; }), m_global_schema_tree(std::move(schema_tree)), m_schema_id(schema_id), m_json_serializer(std::make_shared()) {} @@ -90,19 +91,38 @@ class SchemaReader { bool get_next_message(std::string& message); /** - * Gets next message with a filter + * Gets the next message matching a filter * @param message * @param filter * @return true if there is a next message */ bool get_next_message(std::string& message, FilterClass* filter); + /** + * Gets the next message matching a filter, and its timestamp + * @param message + * @param timestamp + * @param filter + * @return true if there is a next message + */ + bool get_next_message_with_timestamp( + std::string& message, + epochtime_t& timestamp, + FilterClass* filter + ); + /** * Initializes the filter * @param filter */ void initialize_filter(FilterClass* filter); + /** + * Marks a column as timestamp + * @param column_reader + */ + void mark_column_as_timestamp(BaseColumnReader* column_reader); + private: /** * Generates a local schema tree @@ -119,11 +139,9 @@ class SchemaReader { void generate_json_template(int32_t id); /** - * Gets a json pointer string - * @param s - * @return + * Generates a json string from the extracted values */ - static std::string get_json_pointer_string(std::string const& s); + void generate_json_string(); int32_t m_schema_id; std::string m_path; @@ -137,6 +155,9 @@ class SchemaReader { std::vector m_columns; std::vector m_reordered_columns; + BaseColumnReader* m_timestamp_column; + std::function m_get_timestamp; + std::shared_ptr m_global_schema_tree; std::shared_ptr m_local_schema_tree; std::unordered_map m_global_id_to_local_id; diff --git a/components/core/src/clp_s/SchemaTree.hpp b/components/core/src/clp_s/SchemaTree.hpp index 178f80f7c..dbf6f0796 100644 --- a/components/core/src/clp_s/SchemaTree.hpp +++ b/components/core/src/clp_s/SchemaTree.hpp @@ -20,7 +20,8 @@ enum class NodeType : uint8_t { ARRAY, NULLVALUE, DATESTRING, - FLOATDATESTRING + FLOATDATESTRING, + UNKNOWN }; class SchemaNode { diff --git a/components/core/src/clp_s/TimestampDictionaryReader.cpp b/components/core/src/clp_s/TimestampDictionaryReader.cpp index b42860dd3..fdec53ce2 100644 --- a/components/core/src/clp_s/TimestampDictionaryReader.cpp +++ b/components/core/src/clp_s/TimestampDictionaryReader.cpp @@ -44,14 +44,24 @@ void TimestampDictionaryReader::read_new_entries(bool local) { throw OperationFailed(error, __FILENAME__, __LINE__); } - for (int i = 0; i < range_index_size; ++i) { + for (uint64_t i = 0; i < range_index_size; ++i) { TimestampEntry entry; entry.try_read_from_file(m_dictionary_decompressor); + m_entries.emplace_back(std::move(entry)); + std::string column_name = entry.get_key_name(); - TimestampEntry& e = m_column_to_range[column_name] = entry; std::vector tokens; StringUtils::tokenize_column_descriptor(column_name, tokens); - m_tokenized_column_to_range.emplace_back(std::move(tokens), &e); + m_tokenized_column_to_range.emplace_back(std::move(tokens), &m_entries.back()); + + // TODO: Currently, we only allow a single authoritative timestamp column at ingestion time, + // but the timestamp dictionary is designed to store the ranges of several timestamp + // columns. We should enforce a convention that the first entry in the timestamp dictionary + // corresponds to the "authoritative" timestamp column for the dataset. + if (i == 0) { + m_authoritative_timestamp_column_ids = m_entries.back().get_column_ids(); + m_authoritative_timestamp_tokenized_column = tokens; + } } // Local timestamp dictionaries only contain range indices, and @@ -92,15 +102,4 @@ TimestampDictionaryReader::get_string_encoding(epochtime_t epoch, uint64_t forma return ret; } -std::optional> -TimestampDictionaryReader::get_authoritative_timestamp_column() const { - // TODO: Currently, we only allow a single authoritative timestamp column at ingestion time, but - // the timestamp dictionary is designed to store the ranges of several timestamp columns. We - // should enforce a convention that the first entry in the timestamp dictionary corresponds to - // the "authoritative" timestamp column for the dataset. - for (auto it = tokenized_column_to_range_begin(); tokenized_column_to_range_end() != it; ++it) { - return it->first; - } - return std::nullopt; -} } // namespace clp_s diff --git a/components/core/src/clp_s/TimestampDictionaryReader.hpp b/components/core/src/clp_s/TimestampDictionaryReader.hpp index 3ac68ab95..2cca0eeb8 100644 --- a/components/core/src/clp_s/TimestampDictionaryReader.hpp +++ b/components/core/src/clp_s/TimestampDictionaryReader.hpp @@ -72,15 +72,16 @@ class TimestampDictionaryReader { auto tokenized_column_to_range_end() const { return m_tokenized_column_to_range.end(); } - /** - * @return the tokens for the authoritative timestamp column if it exists, or an - * empty option if it does not - */ - std::optional> get_authoritative_timestamp_column() const; + std::optional>& get_authoritative_timestamp_tokenized_column() { + return m_authoritative_timestamp_tokenized_column; + } + + std::unordered_set& get_authoritative_timestamp_column_ids() { + return m_authoritative_timestamp_column_ids; + } private: typedef std::map id_to_pattern_t; - typedef std::map column_to_range_t; typedef std::vector, TimestampEntry*>> tokenized_column_to_range_t; @@ -90,8 +91,11 @@ class TimestampDictionaryReader { ZstdDecompressor m_dictionary_decompressor; id_to_pattern_t m_patterns; - column_to_range_t m_column_to_range; + std::vector m_entries; tokenized_column_to_range_t m_tokenized_column_to_range; + + std::optional> m_authoritative_timestamp_tokenized_column; + std::unordered_set m_authoritative_timestamp_column_ids; }; } // namespace clp_s diff --git a/components/core/src/clp_s/TimestampDictionaryWriter.cpp b/components/core/src/clp_s/TimestampDictionaryWriter.cpp index b38ef83be..af0f3eacf 100644 --- a/components/core/src/clp_s/TimestampDictionaryWriter.cpp +++ b/components/core/src/clp_s/TimestampDictionaryWriter.cpp @@ -126,6 +126,11 @@ epochtime_t TimestampDictionaryWriter::ingest_entry( timestamp_end_pos ); + if (pattern == nullptr) { + throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__); + } + + pattern_id = get_pattern_id(pattern); auto entry = m_local_column_id_to_range.find(node_id); if (entry == m_local_column_id_to_range.end()) { TimestampEntry new_entry(key); @@ -135,12 +140,6 @@ epochtime_t TimestampDictionaryWriter::ingest_entry( entry->second.ingest_timestamp(ret); } - if (pattern == nullptr) { - throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__); - } - - pattern_id = get_pattern_id(pattern); - return ret; } diff --git a/components/core/src/clp_s/TimestampDictionaryWriter.hpp b/components/core/src/clp_s/TimestampDictionaryWriter.hpp index 01e84a80b..d99d95b49 100644 --- a/components/core/src/clp_s/TimestampDictionaryWriter.hpp +++ b/components/core/src/clp_s/TimestampDictionaryWriter.hpp @@ -1,11 +1,12 @@ #ifndef CLP_S_TIMESTAMPDICTIONARYWRITER_HPP #define CLP_S_TIMESTAMPDICTIONARYWRITER_HPP -#include #include #include +#include #include "FileWriter.hpp" +#include "SchemaTree.hpp" #include "TimestampEntry.hpp" #include "TimestampPattern.hpp" #include "ZstdCompressor.hpp" @@ -60,8 +61,21 @@ class TimestampDictionaryWriter { */ void write_local_and_flush_to_disk(); + /** + * Gets the pattern id for a given pattern + * @param pattern + * @return the pattern id + */ uint64_t get_pattern_id(TimestampPattern const* pattern); + /** + * Ingests a timestamp entry + * @param key + * @param node_id + * @param timestamp + * @param pattern_id + * @return the epoch time corresponding to the string timestamp + */ epochtime_t ingest_entry( std::string const& key, int32_t node_id, @@ -69,6 +83,12 @@ class TimestampDictionaryWriter { uint64_t& pattern_id ); + /** + * Ingests a timestamp entry + * @param column_key + * @param node_id + * @param timestamp + */ void ingest_entry(std::string const& key, int32_t node_id, double timestamp); void ingest_entry(std::string const& key, int32_t node_id, int64_t timestamp); @@ -90,7 +110,16 @@ class TimestampDictionaryWriter { epochtime_t get_end_timestamp() const; private: + /** + * Merges the local timestamp ranges into the global timestamp ranges + */ void merge_local_range(); + + /** + * Writes the timestamp entries to the disk + * @param ranges + * @param compressor + */ void write_timestamp_entries( std::map const& ranges, ZstdCompressor& compressor @@ -110,6 +139,7 @@ class TimestampDictionaryWriter { pattern_to_id_t m_pattern_to_id; uint64_t m_next_id{}; + std::map m_global_column_key_to_range; std::map m_local_column_key_to_range; std::unordered_map m_local_column_id_to_range; diff --git a/components/core/src/clp_s/TimestampEntry.cpp b/components/core/src/clp_s/TimestampEntry.cpp index c942f9c66..54b27d22e 100644 --- a/components/core/src/clp_s/TimestampEntry.cpp +++ b/components/core/src/clp_s/TimestampEntry.cpp @@ -80,6 +80,7 @@ ErrorCode TimestampEntry::try_read_from_file(ZstdDecompressor& decompressor) { if (ErrorCodeSuccess != error_code) { return error_code; } + error_code = decompressor.try_read_string(column_len, m_key_name); if (ErrorCodeSuccess != error_code) { return error_code; diff --git a/components/core/src/clp_s/TimestampEntry.hpp b/components/core/src/clp_s/TimestampEntry.hpp index 57f08c5b2..ad40b4b89 100644 --- a/components/core/src/clp_s/TimestampEntry.hpp +++ b/components/core/src/clp_s/TimestampEntry.hpp @@ -55,23 +55,19 @@ class TimestampEntry { * Ingest a timestamp potentially adjusting the start and end bounds for this * TimestampEntry. * @param timestamp the timestamp to be ingested - * @return the epoch time corresponding to the string timestamp */ void ingest_timestamp(epochtime_t timestamp); void ingest_timestamp(double timestamp); /** * Merge a timestamp range potentially adjusting the start and end bounds for this - * * @param timestamp the timestamp to be ingested - * @return the epoch time corresponding to the string timestamp */ void merge_range(TimestampEntry const& entry); /** * Write the timestamp entry to a file * @param compressor - * @param column */ void write_to_file(ZstdCompressor& compressor) const; @@ -85,8 +81,6 @@ class TimestampEntry { /** * Read the timestamp entry from a file * @param decompressor - * @param column_name - * @param column_ids */ void read_from_file(ZstdDecompressor& decompressor); diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index bfb785c40..0e22e30be 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -141,7 +141,7 @@ int main(int argc, char const* argv[]) { auto timestamp_dict = clp_s::ReaderUtils::read_timestamp_dictionary(archives_dir); AddTimestampConditions add_timestamp_conditions( - timestamp_dict->get_authoritative_timestamp_column(), + timestamp_dict->get_authoritative_timestamp_tokenized_column(), command_line_arguments.get_search_begin_ts(), command_line_arguments.get_search_end_ts() ); @@ -198,10 +198,13 @@ int main(int argc, char const* argv[]) { output_handler = std::make_unique( command_line_arguments.get_mongodb_uri(), command_line_arguments.get_mongodb_collection(), - command_line_arguments.get_batch_size() + command_line_arguments.get_batch_size(), + command_line_arguments.get_max_num_results() ); } else { - output_handler = std::make_unique(); + output_handler = std::make_unique( + command_line_arguments.get_max_num_results() + ); } // output result diff --git a/components/core/src/clp_s/search/Output.cpp b/components/core/src/clp_s/search/Output.cpp index 5ec085ee6..a87e1d0a1 100644 --- a/components/core/src/clp_s/search/Output.cpp +++ b/components/core/src/clp_s/search/Output.cpp @@ -100,14 +100,24 @@ void Output::filter() { m_var_dict, m_log_dict, m_array_dict, - m_timestamp_dict + m_timestamp_dict, + m_output_handler->should_output_timestamp() ); reader.load(); reader.initialize_filter(this); - while (reader.get_next_message(message, this)) { - m_output_handler->write(message); + + if (m_output_handler->should_output_timestamp()) { + epochtime_t timestamp; + while (reader.get_next_message_with_timestamp(message, timestamp, this)) { + m_output_handler->write(message, timestamp); + } + } else { + while (reader.get_next_message(message, this)) { + m_output_handler->write(message); + } } + reader.close(); } @@ -138,10 +148,10 @@ void Output::init( VariableStringColumnReader* var_reader = dynamic_cast(column.second); if (m_match.schema_searches_against_column(schema_id, column.first)) { - if (clp_reader != nullptr && clp_reader->get_type() == "string") { + if (clp_reader != nullptr && clp_reader->get_type() == NodeType::CLPSTRING) { m_clp_string_readers[column.first] = clp_reader; m_other_columns.push_back(column.second); - } else if (var_reader != nullptr && var_reader->get_type() == "string") { + } else if (var_reader != nullptr && var_reader->get_type() == NodeType::VARSTRING) { m_var_string_readers[column.first] = var_reader; m_other_columns.push_back(column.second); } else if (auto date_column_reader = dynamic_cast(column.second)) diff --git a/components/core/src/clp_s/search/OutputHandler.cpp b/components/core/src/clp_s/search/OutputHandler.cpp index a3bb64255..ffb4cb03a 100644 --- a/components/core/src/clp_s/search/OutputHandler.cpp +++ b/components/core/src/clp_s/search/OutputHandler.cpp @@ -4,45 +4,63 @@ namespace clp_s::search { ResultsCacheOutputHandler::ResultsCacheOutputHandler( std::string const& uri, std::string const& collection, - uint64_t batch_size + uint64_t batch_size, + uint64_t max_num_results, + bool should_output_timestamp ) - : m_batch_size(batch_size) { + : OutputHandler(should_output_timestamp), + m_batch_size(batch_size), + m_max_num_results(max_num_results) { try { auto mongo_uri = mongocxx::uri(uri); m_client = mongocxx::client(mongo_uri); m_collection = m_client[mongo_uri.database()][collection]; + m_results.reserve(m_batch_size); } catch (mongocxx::exception const& e) { throw OperationFailed(ErrorCode::ErrorCodeBadParamDbUri, __FILENAME__, __LINE__); } } void ResultsCacheOutputHandler::flush() { + size_t count = 0; + while (false == m_latest_results.empty()) { + auto result = std::move(*m_latest_results.top()); + m_latest_results.pop(); + + try { + m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document( + bsoncxx::builder::basic::kvp("original_path", std::move(result.original_path)), + bsoncxx::builder::basic::kvp("message", std::move(result.message)), + bsoncxx::builder::basic::kvp("timestamp", result.timestamp) + ))); + count++; + + if (count == m_batch_size) { + m_collection.insert_many(m_results); + m_results.clear(); + count = 0; + } + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCode::ErrorCodeFailureDbBulkWrite, __FILE__, __LINE__); + } + } + try { if (false == m_results.empty()) { m_collection.insert_many(m_results); m_results.clear(); } } catch (mongocxx::exception const& e) { - throw OperationFailed(ErrorCode::ErrorCodeFailureDbBulkWrite, __FILENAME__, __LINE__); + throw OperationFailed(ErrorCode::ErrorCodeFailureDbBulkWrite, __FILE__, __LINE__); } } void ResultsCacheOutputHandler::write(std::string const& message, epochtime_t timestamp) { - try { - auto document = bsoncxx::builder::basic::make_document( - bsoncxx::builder::basic::kvp("original_path", ""), - bsoncxx::builder::basic::kvp("message", message), - bsoncxx::builder::basic::kvp("timestamp", timestamp) - ); - - m_results.push_back(std::move(document)); - - if (m_results.size() >= m_batch_size) { - m_collection.insert_many(m_results); - m_results.clear(); - } - } catch (mongocxx::exception const& e) { - throw OperationFailed(ErrorCode::ErrorCodeFailureDbBulkWrite, __FILENAME__, __LINE__); + if (m_latest_results.size() < m_max_num_results) { + m_latest_results.emplace(std::make_unique("", message, timestamp)); + } else if (m_latest_results.top()->timestamp < timestamp) { + m_latest_results.pop(); + m_latest_results.emplace(std::make_unique("", message, timestamp)); } } } // namespace clp_s::search diff --git a/components/core/src/clp_s/search/OutputHandler.hpp b/components/core/src/clp_s/search/OutputHandler.hpp index 1f644737f..60ff2f947 100644 --- a/components/core/src/clp_s/search/OutputHandler.hpp +++ b/components/core/src/clp_s/search/OutputHandler.hpp @@ -1,6 +1,7 @@ #ifndef CLP_S_SEARCH_OUTPUTHANDLER_HPP #define CLP_S_SEARCH_OUTPUTHANDLER_HPP +#include #include #include @@ -19,7 +20,8 @@ namespace clp_s::search { class OutputHandler { public: // Constructors - explicit OutputHandler() = default; + explicit OutputHandler(bool should_output_timestamp) + : m_should_output_timestamp(should_output_timestamp){}; // Destructor virtual ~OutputHandler() = default; @@ -42,6 +44,11 @@ class OutputHandler { * Flushes the output handler. */ virtual void flush() = 0; + + [[nodiscard]] bool should_output_timestamp() const { return m_should_output_timestamp; } + +protected: + bool m_should_output_timestamp; }; /** @@ -50,7 +57,8 @@ class OutputHandler { class StandardOutputHandler : public OutputHandler { public: // Constructors - explicit StandardOutputHandler() = default; + explicit StandardOutputHandler(bool should_output_timestamp = false) + : OutputHandler(should_output_timestamp) {} // Methods inherited from OutputHandler void write(std::string const& message, epochtime_t timestamp) override { @@ -67,6 +75,28 @@ class StandardOutputHandler : public OutputHandler { */ class ResultsCacheOutputHandler : public OutputHandler { public: + // Types + struct QueryResult { + // Constructors + QueryResult(std::string original_path, std::string message, epochtime_t timestamp) + : original_path(std::move(original_path)), + message(std::move(message)), + timestamp(timestamp) {} + + std::string original_path; + std::string message; + epochtime_t timestamp; + }; + + struct QueryResultGreaterTimestampComparator { + bool operator()( + std::unique_ptr const& r1, + std::unique_ptr const& r2 + ) const { + return r1->timestamp > r2->timestamp; + } + }; + class OperationFailed : public TraceableException { public: // Constructors @@ -78,7 +108,9 @@ class ResultsCacheOutputHandler : public OutputHandler { ResultsCacheOutputHandler( std::string const& uri, std::string const& collection, - uint64_t batch_size + uint64_t batch_size, + uint64_t max_num_results, + bool should_output_timestamp = true ); // Methods inherited from OutputHandler @@ -93,6 +125,12 @@ class ResultsCacheOutputHandler : public OutputHandler { mongocxx::collection m_collection; std::vector m_results; uint64_t m_batch_size; + uint64_t m_max_num_results; + std::priority_queue< + std::unique_ptr, + std::vector>, + QueryResultGreaterTimestampComparator> + m_latest_results; }; } // namespace clp_s::search diff --git a/components/core/src/clp_s/search/SearchUtils.cpp b/components/core/src/clp_s/search/SearchUtils.cpp index c255c1f38..bb6e648a2 100644 --- a/components/core/src/clp_s/search/SearchUtils.cpp +++ b/components/core/src/clp_s/search/SearchUtils.cpp @@ -36,6 +36,7 @@ LiteralType node_to_literal_type(NodeType type) { return LiteralType::EpochDateT; case NodeType::FLOATDATESTRING: return LiteralType::FloatDateT; + case NodeType::UNKNOWN: default: return LiteralType::UnknownT; }