Skip to content

Commit

Permalink
clp-s: Retrieve timestamps from search results; Add support for retur…
Browse files Browse the repository at this point in the history
…ning only N search results with latest timestamps. (#256)
  • Loading branch information
wraymo authored Feb 5, 2024
1 parent e80cee0 commit 59b2c22
Show file tree
Hide file tree
Showing 21 changed files with 367 additions and 187 deletions.
3 changes: 2 additions & 1 deletion components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ void ArchiveReader::open(ArchiveReaderOption& option) {
m_var_dict,
m_log_dict,
m_array_dict,
m_timestamp_dict
m_timestamp_dict,
false
);

schema_reader->load();
Expand Down
17 changes: 9 additions & 8 deletions components/core/src/clp_s/ColumnReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <variant>

#include "DictionaryReader.hpp"
#include "SchemaTree.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"
#include "ZstdDecompressor.hpp"
Expand Down Expand Up @@ -36,7 +37,7 @@ class BaseColumnReader {

int32_t get_id() const { return m_id; }

virtual std::string get_type() { return "base"; }
virtual NodeType get_type() { return NodeType::UNKNOWN; }

/**
* Extracts a value of the column
Expand All @@ -63,7 +64,7 @@ class Int64ColumnReader : public BaseColumnReader {
// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

std::string get_type() override { return "int"; }
NodeType get_type() override { return NodeType::INTEGER; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;
Expand All @@ -84,7 +85,7 @@ class FloatColumnReader : public BaseColumnReader {
// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

std::string get_type() override { return "float"; }
NodeType get_type() override { return NodeType::FLOAT; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;
Expand All @@ -105,7 +106,7 @@ class BooleanColumnReader : public BaseColumnReader {
// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

std::string get_type() override { return "bool"; }
NodeType get_type() override { return NodeType::BOOLEAN; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;
Expand Down Expand Up @@ -135,7 +136,7 @@ class ClpStringColumnReader : public BaseColumnReader {
// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

std::string get_type() override { return m_is_array ? "array" : "string"; }
NodeType get_type() override { return m_is_array ? NodeType::ARRAY : NodeType::CLPSTRING; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;
Expand Down Expand Up @@ -182,7 +183,7 @@ class VariableStringColumnReader : public BaseColumnReader {
// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

std::string get_type() override { return "string"; }
NodeType get_type() override { return NodeType::VARSTRING; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;
Expand Down Expand Up @@ -217,7 +218,7 @@ class DateStringColumnReader : public BaseColumnReader {
// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

std::string get_type() override { return "string"; }
NodeType get_type() override { return NodeType::DATESTRING; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;
Expand Down Expand Up @@ -246,7 +247,7 @@ class FloatDateStringColumnReader : public BaseColumnReader {
// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

std::string get_type() override { return "string"; }
NodeType get_type() override { return NodeType::FLOATDATESTRING; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/ColumnWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class FloatDateStringColumnWriter : public BaseColumnWriter {
std::shared_ptr<TimestampDictionaryWriter> timestamp_dict
)
: BaseColumnWriter(name, id),

m_timestamp_dict(std::move(timestamp_dict)) {}

// Destructor
Expand Down
10 changes: 9 additions & 1 deletion components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
po::value<uint64_t>(&m_batch_size)->value_name("BATCH_SIZE")->
default_value(m_batch_size),
"The number of documents to insert into MongoDB in a batch"
)(
"max-num-results",
po::value<uint64_t>(&m_max_num_results)->value_name("MAX_NUM_RESULTS")->
default_value(m_max_num_results),
"The maximum number of results to output"
);
// clang-format on
search_options.add(output_options);
Expand Down Expand Up @@ -421,9 +426,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"MongoDB uri and collection must be specified together"
);
}
if (m_batch_size == 0) {
if (0 == m_batch_size) {
throw std::invalid_argument("Batch size must be greater than 0");
}
if (0 == m_max_num_results) {
throw std::invalid_argument("Max number of results must be greater than 0");
}

m_mongodb_enabled = true;
}
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class CommandLineArguments {

uint64_t get_batch_size() const { return m_batch_size; }

uint64_t get_max_num_results() const { return m_max_num_results; }

std::string const& get_query() const { return m_query; }

std::optional<epochtime_t> get_search_begin_ts() const { return m_search_begin_ts; }
Expand Down Expand Up @@ -97,6 +99,7 @@ class CommandLineArguments {
std::string m_mongodb_uri;
std::string m_mongodb_collection;
uint64_t m_batch_size{1000};
uint64_t m_max_num_results{1000};

// Search variables
std::string m_query;
Expand Down
106 changes: 69 additions & 37 deletions components/core/src/clp_s/ReaderUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,51 +183,83 @@ std::vector<int32_t> ReaderUtils::get_schemas(std::string const& archive_path) {
return schemas;
}

BaseColumnReader* ReaderUtils::append_reader_column(
SchemaReader* reader,
int32_t column_id,
std::shared_ptr<SchemaTree> const& schema_tree,
std::shared_ptr<VariableDictionaryReader> const& var_dict,
std::shared_ptr<LogTypeDictionaryReader> const& log_dict,
std::shared_ptr<LogTypeDictionaryReader> const& array_dict,
std::shared_ptr<TimestampDictionaryReader> const& timestamp_dict
) {
BaseColumnReader* column_reader = nullptr;
auto node = schema_tree->get_node(column_id);
std::string key_name = node->get_key_name();
switch (node->get_type()) {
case NodeType::INTEGER:
column_reader = new Int64ColumnReader(key_name, column_id);
break;
case NodeType::FLOAT:
column_reader = new FloatColumnReader(key_name, column_id);
break;
case NodeType::CLPSTRING:
column_reader = new ClpStringColumnReader(key_name, column_id, var_dict, log_dict);
break;
case NodeType::VARSTRING:
column_reader = new VariableStringColumnReader(key_name, column_id, var_dict);
break;
case NodeType::BOOLEAN:
reader->append_column(new BooleanColumnReader(key_name, column_id));
break;
case NodeType::ARRAY:
column_reader
= new ClpStringColumnReader(key_name, column_id, var_dict, array_dict, true);
break;
case NodeType::DATESTRING:
column_reader = new DateStringColumnReader(key_name, column_id, timestamp_dict);
break;
case NodeType::FLOATDATESTRING:
column_reader = new FloatDateStringColumnReader(key_name, column_id);
break;
case NodeType::OBJECT:
case NodeType::NULLVALUE:
reader->append_column(column_id);
break;
case NodeType::UNKNOWN:
break;
}

if (column_reader) {
reader->append_column(column_reader);
}
return column_reader;
}

void ReaderUtils::append_reader_columns(
SchemaReader* reader,
Schema const& columns,
std::shared_ptr<SchemaTree> const& schema_tree,
std::shared_ptr<VariableDictionaryReader> const& var_dict,
std::shared_ptr<LogTypeDictionaryReader> const& log_dict,
std::shared_ptr<LogTypeDictionaryReader> const& array_dict,
std::shared_ptr<TimestampDictionaryReader> const& timestamp_dict
std::shared_ptr<TimestampDictionaryReader> const& timestamp_dict,
bool extract_timestamp
) {
for (int32_t column : columns) {
auto node = schema_tree->get_node(column);
std::string key_name = node->get_key_name();
switch (node->get_type()) {
case NodeType::INTEGER:
reader->append_column(new Int64ColumnReader(key_name, column));
break;
case NodeType::FLOAT:
reader->append_column(new FloatColumnReader(key_name, column));
break;
case NodeType::CLPSTRING:
reader->append_column(
new ClpStringColumnReader(key_name, column, var_dict, log_dict)
);
break;
case NodeType::VARSTRING:
reader->append_column(new VariableStringColumnReader(key_name, column, var_dict));
break;
case NodeType::BOOLEAN:
reader->append_column(new BooleanColumnReader(key_name, column));
break;
case NodeType::ARRAY:
reader->append_column(
new ClpStringColumnReader(key_name, column, var_dict, array_dict, true)
);
break;
case NodeType::DATESTRING:
reader->append_column(new DateStringColumnReader(key_name, column, timestamp_dict));
break;
case NodeType::FLOATDATESTRING:
reader->append_column(new FloatDateStringColumnReader(key_name, column));
break;
case NodeType::OBJECT:
case NodeType::NULLVALUE:
reader->append_column(column);
break;
auto timestamp_column_ids = timestamp_dict->get_authoritative_timestamp_column_ids();

for (int32_t column_id : columns) {
BaseColumnReader* column_reader = append_reader_column(
reader,
column_id,
schema_tree,
var_dict,
log_dict,
array_dict,
timestamp_dict
);

if (extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0) {
reader->mark_column_as_timestamp(column_reader);
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions components/core/src/clp_s/ReaderUtils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class ReaderUtils {
* @param log_dict
* @param array_dict
* @param timestamp_dict
* @param extract_timestamp
*/
static void append_reader_columns(
SchemaReader* reader,
Expand All @@ -111,6 +112,29 @@ class ReaderUtils {
std::shared_ptr<VariableDictionaryReader> const& var_dict,
std::shared_ptr<LogTypeDictionaryReader> const& log_dict,
std::shared_ptr<LogTypeDictionaryReader> const& array_dict,
std::shared_ptr<TimestampDictionaryReader> const& timestamp_dict,
bool extract_timestamp
);

private:
/**
* Appends a column to the given schema reader
* @param reader
* @param column_id
* @param schema_tree
* @param var_dict
* @param log_dict
* @param array_dict
* @param timestamp_dict
* @return the appended reader column
*/
static BaseColumnReader* append_reader_column(
SchemaReader* reader,
int32_t column_id,
std::shared_ptr<SchemaTree> const& schema_tree,
std::shared_ptr<VariableDictionaryReader> const& var_dict,
std::shared_ptr<LogTypeDictionaryReader> const& log_dict,
std::shared_ptr<LogTypeDictionaryReader> const& array_dict,
std::shared_ptr<TimestampDictionaryReader> const& timestamp_dict
);
};
Expand Down
Loading

0 comments on commit 59b2c22

Please sign in to comment.