Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-s): Unescape string values during ingestion and fix support for search using escape sequences. #622

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 0 additions & 2 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ set(
../clp/ffi/KeyValuePairLogEvent.hpp
../clp/ffi/SchemaTree.cpp
../clp/ffi/SchemaTree.hpp
../clp/ffi/utils.cpp
../clp/ffi/utils.hpp
../clp/ffi/Value.hpp
../clp/FileDescriptor.cpp
../clp/FileDescriptor.hpp
Expand Down
22 changes: 22 additions & 0 deletions components/core/src/clp_s/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "BufferViewReader.hpp"
#include "ColumnWriter.hpp"
#include "Utils.hpp"
#include "VariableDecoder.hpp"

namespace clp_s {
Expand Down Expand Up @@ -88,6 +89,20 @@ void ClpStringColumnReader::extract_string_value_into_buffer(
VariableDecoder::decode_variables_into_message(entry, *m_var_dict, encoded_vars, buffer);
}

void ClpStringColumnReader::extract_escaped_string_value_into_buffer(
uint64_t cur_message,
std::string& buffer
) {
if (false == m_is_array) {
// TODO: escape while decoding instead of after.
std::string tmp;
extract_string_value_into_buffer(cur_message, tmp);
StringUtils::escape_json_string(buffer, tmp);
} else {
extract_string_value_into_buffer(cur_message, buffer);
}
}

int64_t ClpStringColumnReader::get_encoded_id(uint64_t cur_message) {
auto value = m_logtypes[cur_message];
return ClpStringColumnWriter::get_encoded_log_dict_id(value);
Expand Down Expand Up @@ -125,6 +140,13 @@ void VariableStringColumnReader::extract_string_value_into_buffer(
buffer.append(m_var_dict->get_value(m_variables[cur_message]));
}

void VariableStringColumnReader::extract_escaped_string_value_into_buffer(
uint64_t cur_message,
std::string& buffer
) {
StringUtils::escape_json_string(buffer, m_var_dict->get_value(m_variables[cur_message]));
}

int64_t VariableStringColumnReader::get_variable_id(uint64_t cur_message) {
return m_variables[cur_message];
}
Expand Down
17 changes: 17 additions & 0 deletions components/core/src/clp_s/ColumnReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ class BaseColumnReader {
*/
virtual void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) = 0;

/**
* Extracts a value from the column, escapes it, and serializes it into a provided buffer as a
* string.
* @param cur_message
* @param buffer
*/
virtual void
extract_escaped_string_value_into_buffer(uint64_t cur_message, std::string& buffer) {
extract_string_value_into_buffer(cur_message, buffer);
}

private:
int32_t m_id;
};
Expand Down Expand Up @@ -152,6 +163,9 @@ class ClpStringColumnReader : public BaseColumnReader {

void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

void
extract_escaped_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

/**
* Gets the encoded id of the variable
* @param cur_message
Expand Down Expand Up @@ -196,6 +210,9 @@ class VariableStringColumnReader : public BaseColumnReader {

void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

void
extract_escaped_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;

/**
* Gets the encoded id of the variable
* @param cur_message
Expand Down
66 changes: 15 additions & 51 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "../clp/ffi/ir_stream/IrUnitType.hpp"
#include "../clp/ffi/KeyValuePairLogEvent.hpp"
#include "../clp/ffi/SchemaTree.hpp"
#include "../clp/ffi/utils.hpp"
#include "../clp/ffi/Value.hpp"
#include "../clp/ir/EncodedTextAst.hpp"
#include "../clp/streaming_compression/zstd/Decompressor.hpp"
Expand Down Expand Up @@ -126,7 +125,7 @@ void JsonParser::parse_obj_in_array(ondemand::object line, int32_t parent_node_i
size_t object_start = m_current_schema.start_unordered_object(NodeType::Object);
ondemand::field cur_field;
ondemand::value cur_value;
std::string cur_key;
std::string_view cur_key;
int32_t node_id;
while (true) {
while (false == object_stack.empty() && object_it_stack.top() == object_stack.top().end()) {
Expand All @@ -144,7 +143,7 @@ void JsonParser::parse_obj_in_array(ondemand::object line, int32_t parent_node_i
}

cur_field = *object_it_stack.top();
cur_key = std::string_view(cur_field.unescaped_key(true));
cur_key = cur_field.unescaped_key(true);
cur_value = cur_field.value();

switch (cur_value.type()) {
Expand Down Expand Up @@ -194,9 +193,7 @@ void JsonParser::parse_obj_in_array(ondemand::object line, int32_t parent_node_i
break;
}
case ondemand::json_type::string: {
std::string value = std::string(
cur_value.raw_json_token().substr(1, cur_value.raw_json_token().size() - 2)
);
std::string_view value = cur_value.get_string(true);
if (value.find(' ') != std::string::npos) {
node_id = m_archive_writer
->add_node(node_id_stack.top(), NodeType::ClpString, cur_key);
Expand Down Expand Up @@ -272,9 +269,7 @@ void JsonParser::parse_array(ondemand::array array, int32_t parent_node_id) {
break;
}
case ondemand::json_type::string: {
std::string value = std::string(
cur_value.raw_json_token().substr(1, cur_value.raw_json_token().size() - 2)
);
std::string_view value = cur_value.get_string(true);
if (value.find(' ') != std::string::npos) {
node_id = m_archive_writer->add_node(parent_node_id, NodeType::ClpString, "");
} else {
Expand Down Expand Up @@ -309,7 +304,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s

ondemand::field cur_field;

std::string cur_key = key;
std::string_view cur_key = key;
node_id_stack.push(parent_node_id);

bool can_match_timestamp = !m_timestamp_column.empty();
Expand All @@ -320,7 +315,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s
do {
if (false == object_stack.empty()) {
cur_field = *object_it_stack.top();
cur_key = std::string(std::string_view(cur_field.unescaped_key(true)));
cur_key = cur_field.unescaped_key(true);
line = cur_field.value();
if (may_match_timestamp) {
if (object_stack.size() <= m_timestamp_column.size()
Expand Down Expand Up @@ -416,10 +411,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s
break;
}
case ondemand::json_type::string: {
auto raw_json_token = line.raw_json_token();
std::string value
= std::string(raw_json_token.substr(1, raw_json_token.rfind('"') - 1));

std::string_view value = line.get_string(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read from the docs that unmatched surrogate pairs are treated as an error without true. Do you think we should throw an error here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the behaviour with true specifically is that it will replace any invalid unicode codepoint with a placeholder unicode character. This ensures that this method always returns valid utf-8, but can sometimes modify the original input.

The way I see this is that if a user wants to ingest their data despite it containing some invalid unicode this is really one of the only good ways of doing it automatically.

That said, its possible that some users would prefer failing ingestion so that they get direct feedback that they're producing invalid JSON. I could add a flag for this in this PR, or create a github issue to track this feature to add it later if you want?

Also, for what its worth, I talked to @kirkrodrigues when deciding what to do here and he agreed that the automatic utf-8 fix is probably desirable for large deployments.

if (matches_timestamp) {
node_id = m_archive_writer->add_node(
node_id_stack.top(),
Expand Down Expand Up @@ -661,18 +653,11 @@ auto JsonParser::add_node_to_archive_and_translations(
NodeType archive_node_type,
int32_t parent_node_id
) -> int {
auto validated_escaped_key
= clp::ffi::validate_and_escape_utf8_string(ir_node_to_add.get_key_name());
std::string node_key;
if (validated_escaped_key.has_value()) {
node_key = validated_escaped_key.value();
} else {
SPDLOG_ERROR("Key is not UTF-8 compliant: \"{}\"", ir_node_to_add.get_key_name());
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
int const curr_node_archive_id
= m_archive_writer->add_node(parent_node_id, archive_node_type, node_key);

int const curr_node_archive_id = m_archive_writer->add_node(
parent_node_id,
archive_node_type,
ir_node_to_add.get_key_name()
);
m_ir_node_to_archive_node_id_mapping.emplace(
std::make_pair(ir_node_id, archive_node_type),
curr_node_archive_id
Expand Down Expand Up @@ -766,23 +751,10 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
m_current_parsed_message.add_value(node_id, b_value);
} break;
case NodeType::VarString: {
auto const validated_escaped_string = clp::ffi::validate_and_escape_utf8_string(
pair.second.value().get_immutable_view<std::string>()
);
std::string str;
if (validated_escaped_string.has_value()) {
str = validated_escaped_string.value();
} else {
SPDLOG_ERROR(
"String is not utf8 compliant: \"{}\"",
pair.second.value().get_immutable_view<std::string>()
);
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
m_current_parsed_message.add_value(node_id, str);
auto const var_value{pair.second.value().get_immutable_view<std::string>()};
m_current_parsed_message.add_value(node_id, var_value);
} break;
case NodeType::ClpString: {
std::string encoded_str;
std::string decoded_value;
if (pair.second.value().is<clp::ir::EightByteEncodedTextAst>()) {
decoded_value = pair.second.value()
Expand All @@ -796,15 +768,7 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) {
.decode_and_unparse()
.value();
}
auto const validated_escaped_encoded_string
= clp::ffi::validate_and_escape_utf8_string(decoded_value.c_str());
if (validated_escaped_encoded_string.has_value()) {
encoded_str = validated_escaped_encoded_string.value();
} else {
SPDLOG_ERROR("Encoded string is not utf8 compliant: \"{}\"", decoded_value);
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
}
m_current_parsed_message.add_value(node_id, encoded_str);
m_current_parsed_message.add_value(node_id, decoded_value);
} break;
case NodeType::UnstructuredArray: {
std::string array_str;
Expand Down
23 changes: 19 additions & 4 deletions components/core/src/clp_s/JsonSerializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <vector>

#include "ColumnReader.hpp"
#include "Utils.hpp"

namespace clp_s {

class JsonSerializer {
public:
Expand Down Expand Up @@ -67,7 +70,11 @@ class JsonSerializer {
return false;
}

void add_special_key(std::string_view const key) { m_special_keys.emplace_back(key); }
void add_special_key(std::string_view const key) {
std::string tmp;
StringUtils::escape_json_string(tmp, key);
m_special_keys.emplace_back(tmp);
}

void begin_object() {
append_key();
Expand Down Expand Up @@ -109,11 +116,11 @@ class JsonSerializer {
m_json_string += "],";
}

void append_key() { append_key(m_special_keys[m_special_keys_index++]); }
void append_key() { append_escaped_key(m_special_keys[m_special_keys_index++]); }

void append_key(std::string_view const key) {
m_json_string += "\"";
m_json_string += key;
StringUtils::escape_json_string(m_json_string, key);
m_json_string += "\":";
}

Expand All @@ -130,11 +137,17 @@ class JsonSerializer {
void
append_value_from_column_with_quotes(clp_s::BaseColumnReader* column, uint64_t cur_message) {
m_json_string += "\"";
column->extract_string_value_into_buffer(cur_message, m_json_string);
column->extract_escaped_string_value_into_buffer(cur_message, m_json_string);
m_json_string += "\",";
}

private:
void append_escaped_key(std::string_view const key) {
m_json_string.push_back('"');
m_json_string.append(key);
m_json_string.append("\":");
}

std::string m_json_string;
std::vector<Op> m_op_list;
std::vector<std::string> m_special_keys;
Expand All @@ -143,4 +156,6 @@ class JsonSerializer {
size_t m_special_keys_index{0};
};

} // namespace clp_s

#endif // CLP_S_JSONSERIALIZER_HPP
Loading
Loading