diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c8e62465b271ce..3590c7afd30221 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1000,6 +1000,7 @@ DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true"); DEFINE_mBool(variant_enable_flatten_nested, "false"); DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1"); DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "1000"); +DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false"); // block file cache DEFINE_Bool(enable_file_cache, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 9e34869892972a..a52a0357eb1bd8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1182,6 +1182,8 @@ DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column); // Threshold to estimate a column is sparsed // Notice: TEST ONLY DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column); +// Treat invalid json format str as string, instead of throwing exception if false +DECLARE_mBool(variant_throw_exeception_on_invalid_json); DECLARE_mBool(enable_merge_on_write_correctness_check); // USED FOR DEBUGING diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 4a048973c96b31..b85efe17148083 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -475,7 +475,7 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, // already parsed continue; } - ColumnPtr raw_json_column; + ColumnPtr scalar_root_column; if (WhichDataType(remove_nullable(var.get_root_type())).is_json()) { // TODO more efficient way to parse jsonb type, currently we just convert jsonb to // json str and parse them into variant @@ -483,22 +483,31 @@ Status _parse_variant_columns(Block& block, const std::vector& variant_pos, var.get_root()->is_nullable() ? make_nullable(std::make_shared()) : std::make_shared(), - &raw_json_column)); - if (raw_json_column->is_nullable()) { - raw_json_column = assert_cast(raw_json_column.get()) - ->get_nested_column_ptr(); + &scalar_root_column)); + if (scalar_root_column->is_nullable()) { + scalar_root_column = assert_cast(scalar_root_column.get()) + ->get_nested_column_ptr(); } } else { const auto& root = *var.get_root(); - raw_json_column = + scalar_root_column = root.is_nullable() ? assert_cast(root).get_nested_column_ptr() : var.get_root(); } - variant_column = ColumnObject::create(true); - parse_json_to_variant(*variant_column.get(), - assert_cast(*raw_json_column)); + if (scalar_root_column->is_column_string()) { + variant_column = ColumnObject::create(true); + parse_json_to_variant(*variant_column.get(), + assert_cast(*scalar_root_column)); + } else { + // Root maybe other types rather than string like ColumnObject(Int32). + // In this case, we should finlize the root and cast to JSON type + auto expected_root_type = + make_nullable(std::make_shared()); + const_cast(var).ensure_root_node_type(expected_root_type); + variant_column = var.assume_mutable(); + } // Wrap variant with nullmap if it is nullable ColumnPtr result = variant_column->get_ptr(); diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 3a62c2bd6fe018..b012c7ccfc0e4e 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -83,6 +83,7 @@ struct ParseContext { // record an extract json column, used for encoding row store bool record_raw_json_column = false; }; + // three steps to parse and encode variant columns into flatterned columns // 1. parse variant from raw json string // 2. finalize variant column to each subcolumn least commn types, default ignore sparse sub columns diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp index a154ad14333363..a62405a7116c50 100644 --- a/be/src/vec/json/parse2column.cpp +++ b/be/src/vec/json/parse2column.cpp @@ -143,8 +143,14 @@ void parse_json_to_variant(IColumn& column, const char* src, size_t length, } if (!result) { VLOG_DEBUG << "failed to parse " << std::string_view(src, length) << ", length= " << length; - throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}", - std::string_view(src, length)); + if (config::variant_throw_exeception_on_invalid_json) { + throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}", + std::string_view(src, length)); + } + // Treat as string + PathInData root_path; + Field field(src, length); + result = ParseResult {{root_path}, {field}}; } auto& [paths, values] = *result; assert(paths.size() == values.size()); diff --git a/regression-test/suites/variant_p0/mtmv.groovy b/regression-test/suites/variant_p0/mtmv.groovy new file mode 100644 index 00000000000000..e411df243ad584 --- /dev/null +++ b/regression-test/suites/variant_p0/mtmv.groovy @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("regression_test_variant_mtmv"){ + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_materialized_view_nest_rewrite = true" + def load_json_data = {table_name, file_name -> + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'max_filter_ratio', '0.1' + set 'jsonpaths', '[\"$.v.id\", \"$.v.type\", \"$.v.actor\", \"$.v.repo\", \"$.v.payload\", \"$.v.public\", \"$.v.created_at\"]' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + logger.info("Stream load ${file_name} result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + // assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def table_name = "github_events_mtmv" + sql """DROP TABLE IF EXISTS ${table_name}""" + sql """ + CREATE TABLE `${table_name}` ( + `id` BIGINT NOT NULL, + `type` VARCHAR(30) NULL, + `actor` VARIANT NULL, + `repo` VARIANT NULL, + `payload` VARIANT NULL, + `public` BOOLEAN NULL, + `created_at` DATETIME NULL, + INDEX idx_payload (`payload`) USING INVERTED PROPERTIES("parser" = "english", "lower_case" = "true") COMMENT 'inverted index for payload' + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-3.json'}""") + load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2022-11-07-16.json'}""") + sql """DROP MATERIALIZED VIEW IF EXISTS mv1""" + sql """ + CREATE MATERIALIZED VIEW mv1 + BUILD IMMEDIATE REFRESH AUTO ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + as SELECT id, type, actor['id'], actor['display_login'], actor, payload['ref'] FROM github_events_mtmv limit 1024; + """ + String db = context.config.getDbNameByFile(context.file) + def job_name_1 = getJobName(db, "mv1") + waitingMTMVTaskFinished(job_name_1) + + sql """DROP MATERIALIZED VIEW IF EXISTS mv2""" + sql """ + CREATE MATERIALIZED VIEW mv2 + BUILD IMMEDIATE REFRESH AUTO ON MANUAL DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + as SELECT id, cast(actor['id'] as bigint), payload FROM github_events_mtmv limit 1024; + """ + def job_name_2 = getJobName(db, "mv2") + waitingMTMVTaskFinished(job_name_2) +} \ No newline at end of file