Skip to content

Commit

Permalink
[Fix](Variant) handle scalar variant with none string root (apache#37794
Browse files Browse the repository at this point in the history
)

1. If columns is ColumnObject(Int32), root cast to ColumnString will
result crash
2. variant with invalid json will be stored as raw string instead of
throwing exception
  • Loading branch information
eldenmoon committed Jul 16, 2024
1 parent 26f1979 commit 92b5f88
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 11 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");
DEFINE_mBool(variant_enable_flatten_nested, "false");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "1000");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");

// block file cache
DEFINE_Bool(enable_file_cache, "false");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,8 @@ DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
// Threshold to estimate a column is sparsed
// Notice: TEST ONLY
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
// Treat invalid json format str as string, instead of throwing exception if false
DECLARE_mBool(variant_throw_exeception_on_invalid_json);

DECLARE_mBool(enable_merge_on_write_correctness_check);
// USED FOR DEBUGING
Expand Down
27 changes: 18 additions & 9 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,30 +475,39 @@ Status _parse_variant_columns(Block& block, const std::vector<int>& variant_pos,
// already parsed
continue;
}
ColumnPtr raw_json_column;
ColumnPtr scalar_root_column;
if (WhichDataType(remove_nullable(var.get_root_type())).is_json()) {
// TODO more efficient way to parse jsonb type, currently we just convert jsonb to
// json str and parse them into variant
RETURN_IF_ERROR(cast_column({var.get_root(), var.get_root_type(), ""},
var.get_root()->is_nullable()
? make_nullable(std::make_shared<DataTypeString>())
: std::make_shared<DataTypeString>(),
&raw_json_column));
if (raw_json_column->is_nullable()) {
raw_json_column = assert_cast<const ColumnNullable*>(raw_json_column.get())
->get_nested_column_ptr();
&scalar_root_column));
if (scalar_root_column->is_nullable()) {
scalar_root_column = assert_cast<const ColumnNullable*>(scalar_root_column.get())
->get_nested_column_ptr();
}
} else {
const auto& root = *var.get_root();
raw_json_column =
scalar_root_column =
root.is_nullable()
? assert_cast<const ColumnNullable&>(root).get_nested_column_ptr()
: var.get_root();
}

variant_column = ColumnObject::create(true);
parse_json_to_variant(*variant_column.get(),
assert_cast<const ColumnString&>(*raw_json_column));
if (scalar_root_column->is_column_string()) {
variant_column = ColumnObject::create(true);
parse_json_to_variant(*variant_column.get(),
assert_cast<const ColumnString&>(*scalar_root_column));
} else {
// Root maybe other types rather than string like ColumnObject(Int32).
// In this case, we should finlize the root and cast to JSON type
auto expected_root_type =
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
const_cast<ColumnObject&>(var).ensure_root_node_type(expected_root_type);
variant_column = var.assume_mutable();
}

// Wrap variant with nullmap if it is nullable
ColumnPtr result = variant_column->get_ptr();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ struct ParseContext {
// record an extract json column, used for encoding row store
bool record_raw_json_column = false;
};

// three steps to parse and encode variant columns into flatterned columns
// 1. parse variant from raw json string
// 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns
Expand Down
10 changes: 8 additions & 2 deletions be/src/vec/json/parse2column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,14 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length,
}
if (!result) {
VLOG_DEBUG << "failed to parse " << std::string_view(src, length) << ", length= " << length;
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}",
std::string_view(src, length));
if (config::variant_throw_exeception_on_invalid_json) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}",
std::string_view(src, length));
}
// Treat as string
PathInData root_path;
Field field(src, length);
result = ParseResult {{root_path}, {field}};
}
auto& [paths, values] = *result;
assert(paths.size() == values.size());
Expand Down
95 changes: 95 additions & 0 deletions regression-test/suites/variant_p0/mtmv.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("regression_test_variant_mtmv"){
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "SET enable_materialized_view_rewrite=true"
sql "SET enable_materialized_view_nest_rewrite = true"
def load_json_data = {table_name, file_name ->
// load the json data
streamLoad {
table "${table_name}"

// set http request header params
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
set 'jsonpaths', '[\"$.v.id\", \"$.v.type\", \"$.v.actor\", \"$.v.repo\", \"$.v.payload\", \"$.v.public\", \"$.v.created_at\"]'
file file_name // import json file
time 10000 // limit inflight 10s

// if declared a check callback, the default check condition will ignore.
// So you must check all condition

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
logger.info("Stream load ${file_name} result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
// assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}

def table_name = "github_events_mtmv"
sql """DROP TABLE IF EXISTS ${table_name}"""
sql """
CREATE TABLE `${table_name}` (
`id` BIGINT NOT NULL,
`type` VARCHAR(30) NULL,
`actor` VARIANT NULL,
`repo` VARIANT NULL,
`payload` VARIANT NULL,
`public` BOOLEAN NULL,
`created_at` DATETIME NULL,
INDEX idx_payload (`payload`) USING INVERTED PROPERTIES("parser" = "english", "lower_case" = "true") COMMENT 'inverted index for payload'
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-16.json'}""")
sql """DROP MATERIALIZED VIEW IF EXISTS mv1"""
sql """
CREATE MATERIALIZED VIEW mv1
BUILD IMMEDIATE REFRESH AUTO ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
as SELECT id, type, actor['id'], actor['display_login'], actor, payload['ref'] FROM github_events_mtmv limit 1024;
"""
String db = context.config.getDbNameByFile(context.file)
def job_name_1 = getJobName(db, "mv1")
waitingMTMVTaskFinished(job_name_1)

sql """DROP MATERIALIZED VIEW IF EXISTS mv2"""
sql """
CREATE MATERIALIZED VIEW mv2
BUILD IMMEDIATE REFRESH AUTO ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
as SELECT id, cast(actor['id'] as bigint), payload FROM github_events_mtmv limit 1024;
"""
def job_name_2 = getJobName(db, "mv2")
waitingMTMVTaskFinished(job_name_2)
}

0 comments on commit 92b5f88

Please sign in to comment.