Skip to content

Commit

Permalink
update explode_json_array
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Apr 20, 2024
1 parent 006e9ba commit 8948042
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
57 changes: 45 additions & 12 deletions be/src/vec/exprs/table_function/vexplode_json_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ auto min_value = std::numeric_limits<int64_t>::min(); //-9223372036854775808

int ParsedData::set_output(rapidjson::Document& document) {
int size = document.GetArray().Size();
_values_null_flag.resize(size, 0);
switch (_data_type) {
case ExplodeJsonArrayType::INT: {
_values_null_flag.resize(size, 0);
_backup_int.resize(size);
int i = 0;
for (auto& v : document.GetArray()) {
Expand Down Expand Up @@ -78,6 +78,7 @@ int ParsedData::set_output(rapidjson::Document& document) {
break;
}
case ExplodeJsonArrayType::DOUBLE: {
_values_null_flag.resize(size, 0);
_backup_double.resize(size);
int i = 0;
for (auto& v : document.GetArray()) {
Expand All @@ -93,12 +94,19 @@ int ParsedData::set_output(rapidjson::Document& document) {
}
case ExplodeJsonArrayType::STRING: {
int32_t wbytes = 0;
int i = 0;
_data_string_ref.clear();
_backup_string.clear();
_values_null_flag.clear();
for (auto& v : document.GetArray()) {
switch (v.GetType()) {
case rapidjson::Type::kStringType:
case rapidjson::Type::kStringType: {
_backup_string.emplace_back(v.GetString(), v.GetStringLength());
_values_null_flag.emplace_back(false);
break;
// do not set _data_string here.
// Because the address of the string stored in `_backup_string` may
// change each time `emplace_back()` is called.
}
case rapidjson::Type::kNumberType:
if (v.IsUint()) {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%u", v.GetUint());
Expand All @@ -112,39 +120,56 @@ int ParsedData::set_output(rapidjson::Document& document) {
wbytes = snprintf(tmp_buf, sizeof(tmp_buf), "%f", v.GetDouble());
}
_backup_string.emplace_back(tmp_buf, wbytes);
_values_null_flag.emplace_back(false);
// do not set _data_string here.
// Because the address of the string stored in `_backup_string` may
// change each time `emplace_back()` is called.
break;
case rapidjson::Type::kFalseType:
_backup_string.emplace_back(true_value);
_values_null_flag.emplace_back(false);
break;
case rapidjson::Type::kTrueType:
_backup_string.emplace_back(false_value);
_values_null_flag.emplace_back(false);
break;
case rapidjson::Type::kNullType:
_backup_string.emplace_back("", 0);
_values_null_flag[i] = 1;
_values_null_flag.emplace_back(true);
break;
default:
_backup_string.emplace_back("", 0);
_values_null_flag[i] = 1;
_values_null_flag.emplace_back(true);
break;
}
++i;
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_string` to `_data_string`.
for (auto& str : _backup_string) {
_data_string_ref.emplace_back(str.data(), str.length());
}
break;
}
case ExplodeJsonArrayType::JSON: {
int i = 0;
_data_string_ref.clear();
_backup_string.clear();
_values_null_flag.clear();
for (auto& v : document.GetArray()) {
if (v.IsObject()) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
v.Accept(writer);
_backup_string.emplace_back(buffer.GetString(), buffer.GetSize());
_values_null_flag.emplace_back(false);
} else {
_backup_string.emplace_back("", 0);
_values_null_flag[i] = 1;
_values_null_flag.emplace_back(true);
}
++i;
}
// Must set _data_string at the end, so that we can
// save the real addr of string in `_backup_string` to `_data_string`.
for (auto& str : _backup_string) {
_data_string_ref.emplace_back(str);
}
break;
}
Expand All @@ -159,20 +184,23 @@ Status ParsedData::insert_result_from_parsed_data(MutableColumnPtr& column, int
int64_t cur_offset) {
switch (_data_type) {
case ExplodeJsonArrayType::INT: {
assert_cast<ColumnInt32*>(column.get())
assert_cast<ColumnInt64*>(column.get())
->insert_many_raw_data(
reinterpret_cast<const char*>(_backup_int.data() + cur_offset), max_step);
break;
}
case ExplodeJsonArrayType::DOUBLE: {
assert_cast<ColumnFloat64*>(column.get())
->insert_many_raw_data(
reinterpret_cast<const char*>(_backup_double.data() + cur_offset),
max_step);
break;
}
case ExplodeJsonArrayType::JSON:
case ExplodeJsonArrayType::STRING: {
assert_cast<ColumnString*>(column.get())
->insert_many_strings(_backup_string.data() + cur_offset, max_step);
->insert_many_strings(_data_string_ref.data() + cur_offset, max_step);
break;
}
default:
auto error_msg = fmt::format(
Expand All @@ -191,6 +219,7 @@ Status ParsedData::set_type(ExplodeJsonArrayType type) {
case ExplodeJsonArrayType::JSON:
case ExplodeJsonArrayType::STRING:
_data_type = type;
break;
default:
auto error_msg = fmt::format(
"Type not implemented:{} need check it in function explode array", type);
Expand Down Expand Up @@ -235,7 +264,11 @@ void VExplodeJsonArrayTableFunction::process_close() {
}

void VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column) {
DCHECK(false) << " should not run into VExplodeJsonArrayTableFunction::get_value";
if (current_empty()) {
column->insert_default();
} else {
static_cast<void>(_parsed_data.insert_result_from_parsed_data(column, 1, _cur_offset));
}
}

int VExplodeJsonArrayTableFunction::get_value(MutableColumnPtr& column, int max_step) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exprs/table_function/vexplode_json_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ struct ParsedData {

std::vector<int64_t> _backup_int;
std::vector<double> _backup_double;
std::vector<StringRef> _backup_string;
std::vector<StringRef> _data_string_ref;
std::vector<std::string> _backup_string;
std::vector<UInt8> _values_null_flag;
ExplodeJsonArrayType _data_type;
char tmp_buf[128] = {0};
Expand All @@ -66,6 +67,7 @@ struct ParsedData {
_backup_double.clear();
_backup_string.clear();
_values_null_flag.clear();
_data_string_ref.clear();
}
};

Expand Down

0 comments on commit 8948042

Please sign in to comment.