From f0d936ac306f991383cf41b4ff4faef89a81eb38 Mon Sep 17 00:00:00 2001 From: Sun Chenyang Date: Wed, 4 Sep 2024 19:42:25 +0800 Subject: [PATCH 01/14] [fix] (compaction) fix compaction score in time series policy (#40242) ### BEFORE The compaction score is 0 when the merge conditions are not met in the time series policy. ### AFTER The compaction score is the sum of the compaction scores of the rowsets to be merged. --- ...mulative_compaction_time_series_policy.cpp | 68 +------------------ ...ive_compaction_time_series_policy_test.cpp | 2 +- 2 files changed, 2 insertions(+), 68 deletions(-) diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp b/be/src/olap/cumulative_compaction_time_series_policy.cpp index 64e51c77641311..6fa4b8d014313f 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.cpp +++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp @@ -27,14 +27,11 @@ namespace doris { uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) { uint32_t score = 0; - uint32_t level0_score = 0; bool base_rowset_exist = false; const int64_t point = tablet->cumulative_layer_point(); - int64_t level0_total_size = 0; RowsetMetaSharedPtr first_meta; int64_t first_version = INT64_MAX; - std::list checked_rs_metas; // NOTE: tablet._meta_lock is hold auto& rs_metas = tablet->tablet_meta()->all_rs_metas(); // check the base rowset and collect the rowsets of cumulative part @@ -53,12 +50,6 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( } else { // collect the rowsets of cumulative part score += rs_meta->get_compaction_score(); - if (rs_meta->compaction_level() == 0) { - level0_total_size += rs_meta->total_disk_size(); - level0_score += rs_meta->get_compaction_score(); - } else { - checked_rs_metas.push_back(rs_meta); - } } } @@ -73,64 +64,7 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return 0; } - // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size - int64_t compaction_goal_size_mbytes = - tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); - if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) { - return score; - } - - // Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold - if (level0_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { - return score; - } - - // Condition 3: level1 achieve compaction_goal_size - if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { - checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { - return a->version().first < b->version().first; - }); - int32_t rs_meta_count = 0; - int64_t continuous_size = 0; - for (const auto& rs_meta : checked_rs_metas) { - rs_meta_count++; - continuous_size += rs_meta->total_disk_size(); - if (rs_meta_count >= 2) { - if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) { - return score; - } - } - } - } - - int64_t now = UnixMillis(); - int64_t last_cumu = tablet->last_cumu_compaction_success_time(); - if (last_cumu != 0) { - int64_t cumu_interval = now - last_cumu; - - // Condition 4: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second - if (cumu_interval > - (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { - return score; - } - } else if (score > 0) { - // If the compaction process has not been successfully executed, - // the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met - tablet->set_last_cumu_compaction_success_time(now); - } - - // Condition 5: If there is a continuous set of empty rowsets, prioritize merging. - std::vector input_rowsets; - std::vector candidate_rowsets = - tablet->pick_candidate_rowsets_to_cumulative_compaction(); - tablet->calc_consecutive_empty_rowsets( - &input_rowsets, candidate_rowsets, - tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold()); - if (!input_rowsets.empty()) { - return score; - } - - return 0; + return score; } void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point( diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp index 01963d591bedab..3e88e424e43c0b 100644 --- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp +++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp @@ -404,7 +404,7 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_scor const uint32_t score = _tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION, cumulative_compaction_policy); - EXPECT_EQ(0, score); + EXPECT_EQ(9, score); } TEST_F(TestTimeSeriesCumulativeCompactionPolicy, calc_cumulative_compaction_score_big_rowset) { From 2fd75937b24e07c9d88f7168c10528b9eda6a7c2 Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Wed, 4 Sep 2024 19:42:38 +0800 Subject: [PATCH 02/14] [fix](decimal) throw overflow exception if result is NaN of Infinit when converting from decimal to float (#40290) ## Proposed changes When casting `decimal256` to `float`, BE first converting `decimal256` to `long double`, and then converting to `float`, but for some values, the resulting float value is `std::inf`, this PR check this situation and report `Arithmetic overflow`. CPP test: ``` int main() { long double ld = std::stold("1.23456779999999999998e+67"); std::cout << "long double: " << ld << std::endl; float f = static_cast(ld); if (std::isnan(f)) { std::cout << "long double to float is nan" << std::endl; } else if (std::isinf(f)) { std::cout << "long double to float is inf" << std::endl; } else { std::cout << f << std::endl; } double d = static_cast(ld); if (std::isnan(d)) { std::cout << "long double to double is nan" << std::endl; } else if (std::isinf(d)) { std::cout << "long double to double is inf" << std::endl; } else { std::cout << "long double to double is " << d << std::endl; } return 0; } ``` result: ``` long double: 1.23457e+67 long double to float is inf long double to double is 1.23457e+67 ``` --- be/src/vec/data_types/data_type_decimal.h | 5 +- .../decimalv3/test_decimal256_cast.out | 10 +++ .../decimalv3/test_decimal256_cast.groovy | 71 +++++++++++++++++++ 3 files changed, 84 insertions(+), 2 deletions(-) diff --git a/be/src/vec/data_types/data_type_decimal.h b/be/src/vec/data_types/data_type_decimal.h index 580eb5fb3ea55d..b18487d1fb0966 100644 --- a/be/src/vec/data_types/data_type_decimal.h +++ b/be/src/vec/data_types/data_type_decimal.h @@ -595,10 +595,11 @@ void convert_from_decimal(typename ToDataType::FieldType* dst, dst[i] = static_cast(src[i].value) / multiplier.value; } } - FromDataType from_data_type(precision, scale); if constexpr (narrow_integral) { + FromDataType from_data_type(precision, scale); for (size_t i = 0; i < size; i++) { - if (dst[i] < min_result || dst[i] > max_result) { + if (std::isnan(dst[i]) || std::isinf(dst[i]) || dst[i] < min_result || + dst[i] > max_result) { THROW_DECIMAL_CONVERT_OVERFLOW_EXCEPTION(from_data_type.to_string(src[i]), from_data_type.get_name(), ToDataType {}.get_name()); diff --git a/regression-test/data/datatype_p0/decimalv3/test_decimal256_cast.out b/regression-test/data/datatype_p0/decimalv3/test_decimal256_cast.out index 0d79c0606906f2..ac1f3da16dd4fe 100644 --- a/regression-test/data/datatype_p0/decimalv3/test_decimal256_cast.out +++ b/regression-test/data/datatype_p0/decimalv3/test_decimal256_cast.out @@ -26,3 +26,13 @@ -- !decimal256_cast8 -- 0 +-- !decimal256_cast9 -- +-9 -999999999999999999999999999999999999999999999999999999999999999999.9999999999 +9 999999999999999999999999999999999999999999999999999999999999999999.9999999999 + +-- !decimal256_cast10 -- +10 0 + +-- !decimal256_cast_to_double_1 -- +1.2345678E7 + diff --git a/regression-test/suites/datatype_p0/decimalv3/test_decimal256_cast.groovy b/regression-test/suites/datatype_p0/decimalv3/test_decimal256_cast.groovy index ea3001232f1c08..0132e74010dbcc 100644 --- a/regression-test/suites/datatype_p0/decimalv3/test_decimal256_cast.groovy +++ b/regression-test/suites/datatype_p0/decimalv3/test_decimal256_cast.groovy @@ -18,6 +18,9 @@ suite("test_decimal256_cast") { sql "set enable_nereids_planner = true;" sql "set enable_decimal256 = true;" + sql """ + set debug_skip_fold_constant=true; + """ qt_decimal256_cast0 """SELECT /*+ SET_VAR(enable_fold_constant_by_be = false) */ cast(999999999999999999999999999999999999999999999999999999999999999999.9999999999 as decimalv3(76,10));""" @@ -41,4 +44,72 @@ suite("test_decimal256_cast") { select cast('0.000000000000000000000000000000000000000000000000000000000000000000000012345678901' as decimalv3(76,0)); """ + sql """ + drop table if exists cast_to_dec256; + """ + sql """ + create table cast_to_dec256 ( + k1 int, + v1 varchar(128) + ) distributed by hash(k1) + properties ( + 'replication_num' = '1' + ); + """ + sql """ + insert into cast_to_dec256 values(9, "999999999999999999999999999999999999999999999999999999999999999999.9999999999"), + (-9, "-999999999999999999999999999999999999999999999999999999999999999999.9999999999"); + """ + qt_decimal256_cast9 """ + select k1, cast(v1 as decimalv3(76,10)) from cast_to_dec256 order by k1, v1; + """ + + sql """ + truncate table cast_to_dec256; + """ + sql """ + insert into cast_to_dec256 values(10, "0.000000000000000000000000000000000000000000000000000000000000000000000012345678901"); + """ + qt_decimal256_cast10 """ + select k1, cast(v1 as decimalv3(76, 0)) from cast_to_dec256 order by k1, v1; + """ + + test { + sql """ + select /*+SET_VAR(enable_fold_constant_by_be = true) */cast(cast("12345678.000000000000000000000000000000001" as decimalv3(76, 60)) as float); + """ + exception "Arithmetic overflow" + } + test { + sql """ + select /*+SET_VAR(enable_fold_constant_by_be = false) */cast(cast("12345678.000000000000000000000000000000001" as decimalv3(76, 60)) as float); + """ + exception "Arithmetic overflow" + } + + sql """ + drop table if exists dec256cast_to_float; + """ + sql """ + create table dec256cast_to_float ( + k1 int, + v1 decimalv3(76, 60) + ) distributed by hash(k1) + properties ( + 'replication_num' = '1' + ); + """ + sql """ + insert into dec256cast_to_float values (1, "12345678.000000000000000000000000000000001"); + """ + test { + sql """ + select cast(v1 as float) from dec256cast_to_float; + """ + exception "Arithmetic overflow" + } + qt_decimal256_cast_to_double_1 """ + select cast(v1 as double) from dec256cast_to_float; + """ + } \ No newline at end of file From 6621ae688c191017461baa901c35e1eef3d49e32 Mon Sep 17 00:00:00 2001 From: Dongyang Li Date: Wed, 4 Sep 2024 19:46:33 +0800 Subject: [PATCH 03/14] [fix](ci) add retry to avoid compile fail (#40381) ## Proposed changes Issue Number: close #xxx Co-authored-by: stephen --- cloud/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/CMakeLists.txt b/cloud/CMakeLists.txt index bb697d791e7116..7a273dd04e29c4 100644 --- a/cloud/CMakeLists.txt +++ b/cloud/CMakeLists.txt @@ -435,7 +435,7 @@ endif() if (NOT EXISTS "${THIRDPARTY_SRC}/${FDB_LIB}") file(MAKE_DIRECTORY ${THIRDPARTY_SRC}) - execute_process(COMMAND "curl" "${FDB_LIB_URL}" + execute_process(COMMAND "curl --retry 10 --retry-delay 2 --retry-max-time 30" "${FDB_LIB_URL}" "-o" "${THIRDPARTY_SRC}/${FDB_LIB}" "-k" RESULTS_VARIABLE DOWNLOAD_RET) if (NOT ${DOWNLOAD_RET} STREQUAL "0") From 9963ec2ddfbdf9143a6779febb1b6bf9c83e1baf Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Wed, 4 Sep 2024 19:48:10 +0800 Subject: [PATCH 04/14] [chore](Nereids) do not fallback if parse failed for all case (#40331) --- .../org/apache/doris/qe/ConnectProcessor.java | 47 +++++++++---------- .../datatype_p0/bitmap/test_bitmap_int.groovy | 2 +- ...test_nestedtypes_insert_into_select.groovy | 4 +- .../suites/demo_p0/test_action.groovy | 2 +- .../hive/ddl/test_hive_ddl.groovy | 2 +- .../update_on_current_timestamp.groovy | 2 +- .../grouping_sets/test_grouping_sets.groovy | 2 +- .../suites/query_p0/join/test_join2.groovy | 2 +- 8 files changed, 29 insertions(+), 34 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 91a3dbaad947b5..8f6faf41598cc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; import org.apache.doris.analysis.LiteralExpr; -import org.apache.doris.analysis.NotFallbackInParser; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; @@ -300,6 +299,27 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex nereidsSyntaxException = e; } } + + if (stmts == null && !ctx.getSessionVariable().enableFallbackToOriginalPlanner) { + String errMsg; + Throwable exception = null; + if (nereidsParseException != null) { + errMsg = nereidsParseException.getMessage(); + exception = nereidsParseException; + } else if (nereidsSyntaxException != null) { + errMsg = nereidsSyntaxException.getMessage(); + exception = nereidsSyntaxException; + } else { + errMsg = "Nereids parse statements failed. " + originStmt; + } + if (exception == null) { + exception = new AnalysisException(errMsg); + } else { + exception = new AnalysisException(errMsg, exception); + } + handleQueryException(exception, originStmt, null, null); + return; + } } // stmts == null when Nereids cannot planner this query or Nereids is disabled. @@ -325,31 +345,6 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex } } - if (mysqlCommand == MysqlCommand.COM_QUERY - && ctx.getSessionVariable().isEnableNereidsPlanner() - && !ctx.getSessionVariable().enableFallbackToOriginalPlanner - && !stmts.isEmpty() - && stmts.stream().allMatch(s -> s instanceof NotFallbackInParser)) { - String errMsg; - Throwable exception = null; - if (nereidsParseException != null) { - errMsg = nereidsParseException.getMessage(); - exception = nereidsParseException; - } else if (nereidsSyntaxException != null) { - errMsg = nereidsSyntaxException.getMessage(); - exception = nereidsSyntaxException; - } else { - errMsg = "Nereids parse DQL failed. " + originStmt; - } - if (exception == null) { - exception = new AnalysisException(errMsg); - } else { - exception = new AnalysisException(errMsg, exception); - } - handleQueryException(exception, originStmt, null, null); - return; - } - List origSingleStmtList = null; // if stmts.size() > 1, split originStmt to multi singleStmts if (stmts.size() > 1) { diff --git a/regression-test/suites/datatype_p0/bitmap/test_bitmap_int.groovy b/regression-test/suites/datatype_p0/bitmap/test_bitmap_int.groovy index c890b27fada609..38725987d96fc4 100644 --- a/regression-test/suites/datatype_p0/bitmap/test_bitmap_int.groovy +++ b/regression-test/suites/datatype_p0/bitmap/test_bitmap_int.groovy @@ -68,7 +68,7 @@ suite("test_bitmap_int") { test { sql """SELECT case id_bitmap when 1 then 1 else 0 FROM test_bitmap;""" - exception "Syntax error in line 1" + exception "missing 'END' at 'FROM'" } qt_sql64_4 """SELECT id_bitmap FROM test_bitmap WHERE id_bitmap is null LIMIT 20;""" diff --git a/regression-test/suites/datatype_p0/nested_types/query/test_nestedtypes_insert_into_select.groovy b/regression-test/suites/datatype_p0/nested_types/query/test_nestedtypes_insert_into_select.groovy index 2023ecf641c0f0..633ad98d86f556 100644 --- a/regression-test/suites/datatype_p0/nested_types/query/test_nestedtypes_insert_into_select.groovy +++ b/regression-test/suites/datatype_p0/nested_types/query/test_nestedtypes_insert_into_select.groovy @@ -32,7 +32,7 @@ suite("test_nestedtypes_insert_into_select", "p0") { test { sql "insert into ast values ('text' , [named_struct('a',1,'b','home'),named_struct('a',2,'b','work')]);" - exception "errCode = 2, detailMessage = Sql parser can't convert the result to array, please check your sql." + exception "mismatched input 'named_struct' expecting" } @@ -50,6 +50,6 @@ suite("test_nestedtypes_insert_into_select", "p0") { test { sql "insert into ast values ('text' , [named_struct('a',1,'b','home'),named_struct('a',2,'b','work')]);" - exception "Sql parser can't convert the result to array" + exception "mismatched input 'named_struct' expecting" } } diff --git a/regression-test/suites/demo_p0/test_action.groovy b/regression-test/suites/demo_p0/test_action.groovy index 1cae0da47e3a0b..7f0d4d7eb8ba24 100644 --- a/regression-test/suites/demo_p0/test_action.groovy +++ b/regression-test/suites/demo_p0/test_action.groovy @@ -19,7 +19,7 @@ suite("test_action") { test { sql "abcdefg" // check exception message contains - exception "Syntax error in line 1" + exception "extraneous input 'abcdefg'" } test { diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy index aa4d8c0b3c6801..1d39ab9cde5503 100644 --- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ddl.groovy @@ -362,7 +362,7 @@ suite("test_hive_ddl", "p0,external,hive,external_docker,external_docker_hive") sql """ CREATE TABLE schema_check ENGINE=hive ; """ - exception "AnalysisException, msg: Should contain at least one column in a table" + exception "Should contain at least one column in a table" } sql """ DROP DATABASE IF EXISTS `test_hive_loc` """ } diff --git a/regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy b/regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy index 597facb3ea345b..ef5496a7a76fcb 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy @@ -190,6 +190,6 @@ suite("nereids_update_on_current_timestamp") { k int, `update_time` datetime(6) default current_timestamp(4) on update current_timestamp(3)) replace, ) AGGREGATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 1 properties("replication_num" = "1");""" - exception "Syntax error in line 3" + exception "mismatched input 'replace'" } } diff --git a/regression-test/suites/query_p0/grouping_sets/test_grouping_sets.groovy b/regression-test/suites/query_p0/grouping_sets/test_grouping_sets.groovy index 76a7476080090d..85bfae6a13c301 100644 --- a/regression-test/suites/query_p0/grouping_sets/test_grouping_sets.groovy +++ b/regression-test/suites/query_p0/grouping_sets/test_grouping_sets.groovy @@ -98,7 +98,7 @@ suite("test_grouping_sets", "p0") { SELECT k1, k3, MAX( k8 ) FROM test_query_db.test GROUP BY k1, GROUPING SETS ( (k1, k3), (k1), ( ) ), ROLLUP(k1, k3) """ - exception "Syntax error" + exception "mismatched input 'SETS'" } qt_select13""" diff --git a/regression-test/suites/query_p0/join/test_join2.groovy b/regression-test/suites/query_p0/join/test_join2.groovy index fb8cdb95f00a72..9158133948f754 100644 --- a/regression-test/suites/query_p0/join/test_join2.groovy +++ b/regression-test/suites/query_p0/join/test_join2.groovy @@ -84,7 +84,7 @@ suite("test_join2", "query,p0,arrow_flight_sql") { FROM ${TBname1} NATURAL JOIN ${TBname2} ORDER BY 1,2,3,4,5,6; """ - exception "natural join is not supported" + exception "mismatched input 'NATURAL'" } qt_join4 """ From edb6eec2dab7a93d89fe29cc315239d5e25cd738 Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Wed, 4 Sep 2024 19:59:04 +0800 Subject: [PATCH 05/14] [fix](OrcWriter) fix be core when upgrading BE without upgrading FE (#40282) Since we have changed the type mapping from Doris to Orc type, using the `Outfile` to export Date/Datetime types will cause BE core dump when only upgrading BE without upgrading FE. --- be/src/pipeline/exec/result_sink_operator.h | 4 ++++ be/src/vec/sink/writer/vfile_result_writer.cpp | 5 +++++ .../main/java/org/apache/doris/analysis/OutFileClause.java | 1 + gensrc/thrift/DataSinks.thrift | 7 +++++++ 4 files changed, 17 insertions(+) diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 06b961b2a31694..33e32e93633453 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -58,6 +58,7 @@ struct ResultFileOptions { std::string file_suffix; //Bring BOM when exporting to CSV format bool with_bom = false; + int64_t orc_writer_version = 0; ResultFileOptions(const TResultFileSinkOptions& t_opt) { file_path = t_opt.file_path; @@ -108,6 +109,9 @@ struct ResultFileOptions { if (t_opt.__isset.orc_compression_type) { orc_compression_type = t_opt.orc_compression_type; } + if (t_opt.__isset.orc_writer_version) { + orc_writer_version = t_opt.orc_writer_version; + } } }; diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index 16491311c17ae6..5161cf2928d409 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -84,6 +84,11 @@ VFileResultWriter::VFileResultWriter( Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) { _state = state; _init_profile(profile); + // check orc writer version + if (_file_opts->file_format == TFileFormatType::FORMAT_ORC && + _file_opts->orc_writer_version < 1) { + return Status::InternalError("orc writer version is less than 1."); + } // Delete existing files if (_file_opts->delete_existing_files) { RETURN_IF_ERROR(_delete_dir()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 6debdca789f125..ef65b405853765 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -884,6 +884,7 @@ public TResultFileSinkOptions toSinkOptions() { if (isOrcFormat()) { sinkOptions.setOrcSchema(serializeOrcSchema()); sinkOptions.setOrcCompressionType(orcCompressionType); + sinkOptions.setOrcWriterVersion(1); } return sinkOptions; } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 14f528666a3121..e46f7e6067cfef 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -133,6 +133,13 @@ struct TResultFileSinkOptions { 18: optional bool with_bom; 19: optional PlanNodes.TFileCompressType orc_compression_type; + + // Since we have changed the type mapping from Doris to Orc type, + // using the Outfile to export Date/Datetime types will cause BE core dump + // when only upgrading BE without upgrading FE. + // orc_writer_version = 1 means doris FE is higher than version 2.1.5 + // orc_writer_version = 0 means doris FE is less than or equal to version 2.1.5 + 20: optional i64 orc_writer_version; } struct TMemoryScratchSink { From 3856acad352adffc0e017d6fc8e6f519b98c5ad8 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 4 Sep 2024 20:06:39 +0800 Subject: [PATCH 06/14] [Opt](delete) Skip newly inserted rows check in non-strict mode partial update if the row's delete sign is marked (#40322) --- .../olap/rowset/segment_v2/segment_writer.cpp | 3 +- .../segment_v2/vertical_segment_writer.cpp | 3 +- .../partial_update/partial_update_delete.csv | 3 +- .../test_partial_update_delete.out | 338 ++++++++++++++++-- .../test_partial_update_delete.groovy | 292 ++++++++++----- 5 files changed, 527 insertions(+), 112 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 105433d2689e01..9f5811f67909bc 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -596,7 +596,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* segment_pos); } else { - if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) { + if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update && + !have_delete_sign) { std::string error_column; for (auto cid : _opts.rowset_ctx->partial_update_info->missing_cids) { const TabletColumn& col = _tablet_schema->column(cid); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 64f72bc0c4669d..7ec9236a4d8cd5 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -455,7 +455,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da DeleteBitmap::TEMP_VERSION_COMMON}, segment_pos); } else { - if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update) { + if (!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update && + !have_delete_sign) { std::string error_column; for (auto cid : _opts.rowset_ctx->partial_update_info->missing_cids) { const TabletColumn& col = _tablet_schema->column(cid); diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv index 5f5fbe759f10b9..e22b5b747a60e7 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_delete.csv @@ -1,3 +1,4 @@ 1 2 -3 \ No newline at end of file +3 +7 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out index e1623d42460181..7a639b8d6e29db 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out @@ -1,21 +1,21 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- +-- !sql1 -- 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 --- !sql -- +-- !sql1 -- 2 2 2 2 2 4 4 4 4 4 5 5 5 5 5 --- !sql -- +-- !sql1 -- 4 4 4 4 4 5 5 5 5 5 --- !with_delete_sign -- +-- !with_delete_sign1 -- 1 \N \N 0 \N 1 1 1 1 1 1 0 2 \N \N 0 \N 1 @@ -25,44 +25,291 @@ 4 4 4 4 4 0 5 5 5 5 5 0 --- !sql -- +-- !sql2 -- 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 +6 6 6 6 6 --- !sql -- +-- !sql2 -- 4 4 4 4 4 5 5 5 5 5 +6 6 6 6 6 --- !sql -- -1 \N \N \N \N 1 +-- !sql2 -- +5 5 5 5 5 +6 6 6 6 6 + +-- !sql2 -- +1 \N \N 0 \N 1 +1 1 1 1 1 0 +2 \N \N 0 \N 1 +2 2 2 2 2 0 +3 \N \N 0 \N 1 +3 3 3 3 3 0 +4 \N \N 0 \N 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +6 6 6 6 6 0 +7 \N \N 0 \N 1 +8 \N \N 0 \N 1 +9 \N \N 0 \N 1 + +-- !sql3 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +1 1 1 1 1 0 1 +1 1 1 1 1 1 1 +2 2 2 2 2 0 2 +2 2 2 2 2 1 2 +3 3 3 3 3 0 3 +3 3 3 3 3 1 3 +4 4 4 4 4 0 4 +4 4 4 4 4 1 4 +5 5 5 5 5 0 5 +6 6 6 6 6 0 6 +7 \N \N 0 \N 1 \N +8 \N \N 0 \N 1 \N +9 \N \N 0 \N 1 \N + +-- !sql4 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql1 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql1 -- +2 2 2 2 2 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql1 -- +4 4 4 4 4 +5 5 5 5 5 + +-- !with_delete_sign1 -- +1 \N \N 0 \N 1 +1 1 1 1 1 0 +2 \N \N 0 \N 1 +2 2 2 2 2 0 +3 \N \N 0 \N 1 +3 3 3 3 3 0 +4 4 4 4 4 0 +5 5 5 5 5 0 + +-- !sql2 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql2 -- +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql2 -- +5 5 5 5 5 +6 6 6 6 6 + +-- !sql2 -- +1 \N \N 0 \N 1 +1 1 1 1 1 0 +2 \N \N 0 \N 1 +2 2 2 2 2 0 +3 \N \N 0 \N 1 +3 3 3 3 3 0 +4 \N \N 0 \N 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +6 6 6 6 6 0 +7 \N \N 0 \N 1 +8 \N \N 0 \N 1 +9 \N \N 0 \N 1 + +-- !sql3 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +1 1 1 1 1 0 1 +1 1 1 1 1 1 1 +2 2 2 2 2 0 2 +2 2 2 2 2 1 2 +3 3 3 3 3 0 3 +3 3 3 3 3 1 3 +4 4 4 4 4 0 4 +4 4 4 4 4 1 4 +5 5 5 5 5 0 5 +6 6 6 6 6 0 6 +7 \N \N 0 \N 1 \N +8 \N \N 0 \N 1 \N +9 \N \N 0 \N 1 \N + +-- !sql4 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql1 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql1 -- +2 2 2 2 2 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql1 -- +4 4 4 4 4 +5 5 5 5 5 + +-- !with_delete_sign1 -- +1 \N \N 0 \N 1 1 1 1 1 1 0 -2 \N \N \N \N 1 +2 \N \N 0 \N 1 2 2 2 2 2 0 -3 \N \N \N \N 1 +3 \N \N 0 \N 1 3 3 3 3 3 0 4 4 4 4 4 0 5 5 5 5 5 0 --- !sql -- +-- !sql2 -- 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 +6 6 6 6 6 + +-- !sql2 -- +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql2 -- +5 5 5 5 5 +6 6 6 6 6 --- !sql -- +-- !sql2 -- +1 \N \N 0 \N 1 +1 1 1 1 1 0 +2 \N \N 0 \N 1 +2 2 2 2 2 0 +3 \N \N 0 \N 1 +3 3 3 3 3 0 +4 \N \N 0 \N 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +6 6 6 6 6 0 +7 \N \N 0 \N 1 +8 \N \N 0 \N 1 +9 \N \N 0 \N 1 + +-- !sql3 -- +1 1 1 1 1 2 2 2 2 2 +3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 +6 6 6 6 6 --- !sql -- +-- !sql3 -- 4 4 4 4 4 5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +5 5 5 5 5 +6 6 6 6 6 --- !with_delete_sign -- +-- !sql3 -- +1 1 1 1 1 0 1 +1 1 1 1 1 1 1 +2 2 2 2 2 0 2 +2 2 2 2 2 1 2 +3 3 3 3 3 0 3 +3 3 3 3 3 1 3 +4 4 4 4 4 0 4 +4 4 4 4 4 1 4 +5 5 5 5 5 0 5 +6 6 6 6 6 0 6 +7 \N \N 0 \N 1 \N +8 \N \N 0 \N 1 \N +9 \N \N 0 \N 1 \N + +-- !sql4 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql1 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql1 -- +2 2 2 2 2 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql1 -- +4 4 4 4 4 +5 5 5 5 5 + +-- !with_delete_sign1 -- 1 \N \N 0 \N 1 1 1 1 1 1 0 2 \N \N 0 \N 1 @@ -72,24 +319,75 @@ 4 4 4 4 4 0 5 5 5 5 5 0 --- !sql -- +-- !sql2 -- 1 1 1 1 1 2 2 2 2 2 3 3 3 3 3 4 4 4 4 4 5 5 5 5 5 +6 6 6 6 6 --- !sql -- +-- !sql2 -- 4 4 4 4 4 5 5 5 5 5 +6 6 6 6 6 --- !sql -- -1 \N \N \N \N 1 +-- !sql2 -- +5 5 5 5 5 +6 6 6 6 6 + +-- !sql2 -- +1 \N \N 0 \N 1 1 1 1 1 1 0 -2 \N \N \N \N 1 +2 \N \N 0 \N 1 2 2 2 2 2 0 -3 \N \N \N \N 1 +3 \N \N 0 \N 1 3 3 3 3 3 0 +4 \N \N 0 \N 1 4 4 4 4 4 0 5 5 5 5 5 0 +6 6 6 6 6 0 +7 \N \N 0 \N 1 +8 \N \N 0 \N 1 +9 \N \N 0 \N 1 + +-- !sql3 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +5 5 5 5 5 +6 6 6 6 6 + +-- !sql3 -- +1 1 1 1 1 0 1 +1 1 1 1 1 1 1 +2 2 2 2 2 0 2 +2 2 2 2 2 1 2 +3 3 3 3 3 0 3 +3 3 3 3 3 1 3 +4 4 4 4 4 0 4 +4 4 4 4 4 1 4 +5 5 5 5 5 0 5 +6 6 6 6 6 0 6 +7 \N \N 0 \N 1 \N +8 \N \N 0 \N 1 \N +9 \N \N 0 \N 1 \N + +-- !sql4 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 +6 6 6 6 6 diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy index 0d83d94f91c918..fc59038122a1d2 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy @@ -20,96 +20,210 @@ suite('test_partial_update_delete') { String db = context.config.getDbNameByFile(context.file) sql "select 1;" // to create database - for (def use_row_store : [false, true]) { - logger.info("current params: use_row_store: ${use_row_store}") - - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql "use ${db};" - - def tableName1 = "test_partial_update_delete1" - sql "DROP TABLE IF EXISTS ${tableName1};" - sql """ CREATE TABLE IF NOT EXISTS ${tableName1} ( - `k1` int NOT NULL, - `c1` int, - `c2` int, - `c3` int NOT NULL, - `c4` int - )UNIQUE KEY(k1) - DISTRIBUTED BY HASH(k1) BUCKETS 1 - PROPERTIES ( - "enable_unique_key_merge_on_write" = "true", - "disable_auto_compaction" = "true", - "replication_num" = "1", - "store_row_column" = "${use_row_store}"); """ - - def tableName2 = "test_partial_update_delete2" - sql "DROP TABLE IF EXISTS ${tableName2};" - sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( - `k` BIGINT NULL - ) UNIQUE KEY(k) - DISTRIBUTED BY HASH(k) BUCKETS 1 - PROPERTIES ( - "enable_unique_key_merge_on_write" = "true", - "disable_auto_compaction" = "true", - "replication_num" = "1", - "store_row_column" = "${use_row_store}"); """ - - sql "insert into ${tableName1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" - qt_sql "select * from ${tableName1} order by k1;" - sql "insert into ${tableName2} values(1),(3);" - sql "delete from ${tableName1} A using ${tableName2} B where A.k1=B.k;" - qt_sql "select * from ${tableName1} order by k1;" - - sql "delete from ${tableName1} where c2=2;" - qt_sql "select * from ${tableName1} order by k1;" - - sql "set skip_delete_sign=true;" - sql "set skip_storage_engine_merge=true;" - sql "set skip_delete_bitmap=true;" - qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - sql "drop table if exists ${tableName1};" - sql "drop table if exists ${tableName2};" - - sql "set skip_delete_sign=false;" - sql "set skip_storage_engine_merge=false;" - sql "set skip_delete_bitmap=false;" - def tableName3 = "test_partial_update_delete3" - sql "DROP TABLE IF EXISTS ${tableName3};" - sql """ CREATE TABLE IF NOT EXISTS ${tableName3} ( - `k1` int NOT NULL, - `c1` int, - `c2` int, - `c3` int, - `c4` int - )UNIQUE KEY(k1) - DISTRIBUTED BY HASH(k1) BUCKETS 1 - PROPERTIES ( - "enable_unique_key_merge_on_write" = "true", - "disable_auto_compaction" = "true", - "replication_num" = "1", - "store_row_column" = "${use_row_store}"); """ - - sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" - qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" - streamLoad { - table "${tableName3}" - - set 'column_separator', ',' - set 'format', 'csv' - set 'columns', 'k1' - set 'partial_columns', 'true' - set 'merge_type', 'DELETE' - - file 'partial_update_delete.csv' - time 10000 + for (def use_nereids : [true, false]) { + for (def use_row_store : [false, true]) { + logger.info("current params: use_nereids: ${use_nereids}, use_row_store: ${use_row_store}") + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql "use ${db};" + if (use_nereids) { + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + } else { + sql "set enable_nereids_planner=false" + } + sql "sync;" + + def tableName1 = "test_partial_update_delete1" + sql "DROP TABLE IF EXISTS ${tableName1};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int NOT NULL, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "store_row_column" = "${use_row_store}"); """ + + + def tableName2 = "test_partial_update_delete2" + sql "DROP TABLE IF EXISTS ${tableName2};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k` BIGINT NULL + ) UNIQUE KEY(k) + DISTRIBUTED BY HASH(k) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "store_row_column" = "${use_row_store}"); """ + + sql "insert into ${tableName1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql1 "select * from ${tableName1} order by k1;" + sql "insert into ${tableName2} values(1),(3);" + sql "delete from ${tableName1} A using ${tableName2} B where A.k1=B.k;" + qt_sql1 "select * from ${tableName1} order by k1;" + + sql "delete from ${tableName1} where c2=2;" + qt_sql1 "select * from ${tableName1} order by k1;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_with_delete_sign1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + sql "drop table if exists ${tableName1};" + sql "drop table if exists ${tableName2};" + + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName3 = "test_partial_update_delete3" + sql "DROP TABLE IF EXISTS ${tableName3};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName3} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int NOT NULL, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "store_row_column" = "${use_row_store}"); """ + + sql "insert into ${tableName3} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5),(6,6,6,6,6);" + qt_sql2 "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName3}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1' + set 'partial_columns', 'true' + set 'merge_type', 'DELETE' + file 'partial_update_delete.csv' + time 10000 + } + sql "sync" + qt_sql2 "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + + sql "set enable_insert_strict=false;" + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${tableName3}(k1, __DORIS_DELETE_SIGN__) values(8,1),(4,1),(9,1);" + qt_sql2 "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" + sql "set enable_insert_strict=true;" + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + sql "drop table if exists ${tableName3};" + + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName4 = "test_partial_update_delete4" + sql "DROP TABLE IF EXISTS ${tableName4};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName4} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int NOT NULL, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "store_row_column" = "${use_row_store}", + "function_column.sequence_col" = "c3"); """ + + sql "insert into ${tableName4} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5),(6,6,6,6,6);" + qt_sql3 "select k1,c1,c2,c3,c4 from ${tableName4} order by k1,c1,c2,c3,c4;" + // if the table has sequence map col, can not set sequence map col when merge_type=delete + streamLoad { + table "${tableName4}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1' + set 'partial_columns', 'true' + set 'merge_type', 'DELETE' + file 'partial_update_delete.csv' + time 10000 + } + sql "sync" + qt_sql3 "select k1,c1,c2,c3,c4 from ${tableName4} order by k1,c1,c2,c3,c4;" + + sql "set enable_insert_strict=false;" + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${tableName4}(k1, __DORIS_DELETE_SIGN__) values(8,1),(4,1),(9,1);" + qt_sql3 "select k1,c1,c2,c3,c4 from ${tableName4} order by k1,c1,c2,c3,c4;" + sql "set enable_insert_strict=true;" + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql3 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__,__DORIS_SEQUENCE_COL__ from ${tableName4} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + sql "drop table if exists ${tableName4};" + + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName5 = "test_partial_update_delete5" + sql "DROP TABLE IF EXISTS ${tableName5};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName5} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int NOT NULL, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "store_row_column" = "${use_row_store}", + "function_column.sequence_type" = "int"); """ + sql "insert into ${tableName5}(k1,c1,c2,c3,c4,__DORIS_SEQUENCE_COL__) values(1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3),(4,4,4,4,4,4),(5,5,5,5,5,5),(6,6,6,6,6,6);" + qt_sql4 "select k1,c1,c2,c3,c4 from ${tableName5} order by k1,c1,c2,c3,c4;" + // if the table has sequence type col, users must set sequence col even if merge_type=delete + streamLoad { + table "${tableName5}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1' + set 'partial_columns', 'true' + set 'merge_type', 'DELETE' + file 'partial_update_delete.csv' + time 10000 + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains('need to specify the sequence column')) + } + } + sql "drop table if exists ${tableName5};" } - sql "sync" - qt_sql "select k1,c1,c2,c3,c4 from ${tableName3} order by k1,c1,c2,c3,c4;" - sql "set skip_delete_sign=true;" - sql "set skip_storage_engine_merge=true;" - sql "set skip_delete_bitmap=true;" - qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - sql "drop table if exists ${tableName3};" } } } From 9ad1f236eabf4b12a28a24a1fa79acec5fc7c585 Mon Sep 17 00:00:00 2001 From: lihangyu <15605149486@163.com> Date: Wed, 4 Sep 2024 21:04:54 +0800 Subject: [PATCH 07/14] [Fix](ShortCircuit) consider delete sign flag when hits row (#40300) If partial update with delete clause, the new rowset delete sign should be considered --- be/src/service/point_query_executor.cpp | 18 +++++++++++++ be/src/service/point_query_executor.h | 5 ++++ .../data/point_query_p0/test_point_query.out | 8 ++++++ .../point_query_p0/test_point_query.groovy | 27 +++++++++++++++++++ 4 files changed, 58 insertions(+) diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 88293ba9b03675..8058e1f1be6302 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -49,7 +49,9 @@ #include "runtime/thread_context.h" #include "util/key_util.h" #include "util/runtime_profile.h" +#include "util/simd/bits.h" #include "util/thrift_util.h" +#include "vec/columns/columns_number.h" #include "vec/data_types/serde/data_type_serde.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" @@ -143,6 +145,9 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vectorroot(), tuple_desc(), output_slot_descs); } + // get the delete sign idx in block + _delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()]; + if (schema.have_column(BeConsts::ROW_STORE_COL)) { const auto& column = *DORIS_TRY(schema.column(BeConsts::ROW_STORE_COL)); _row_store_column_ids = column.unique_id(); @@ -483,6 +488,19 @@ Status PointQueryExecutor::_lookup_row_data() { } } } + // filter rows by delete sign + if (_row_hits > 0 && _reusable->delete_sign_idx() != -1) { + vectorized::ColumnPtr delete_filter_columns = + _result_block->get_columns()[_reusable->delete_sign_idx()]; + const auto& filter = + assert_cast(delete_filter_columns.get())->get_data(); + size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); + if (count == filter.size()) { + _result_block->clear(); + } else if (count > 0) { + return Status::NotSupported("Not implemented since only single row at present"); + } + } return Status::OK(); } diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index 29c7348e3d5b97..19954479c97ec7 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -100,6 +100,9 @@ class Reusable { RuntimeState* runtime_state() { return _runtime_state.get(); } + // delete sign idx in block + int32_t delete_sign_idx() const { return _delete_sign_idx; } + private: // caching TupleDescriptor, output_expr, etc... std::unique_ptr _runtime_state; @@ -118,6 +121,8 @@ class Reusable { std::unordered_set _missing_col_uids; // included cids in rowstore(column group) std::unordered_set _include_col_uids; + // delete sign idx in block + int32_t _delete_sign_idx = -1; }; // RowCache is a LRU cache for row store diff --git a/regression-test/data/point_query_p0/test_point_query.out b/regression-test/data/point_query_p0/test_point_query.out index 5a4e0b6617826f..1cc4142e39f306 100644 --- a/regression-test/data/point_query_p0/test_point_query.out +++ b/regression-test/data/point_query_p0/test_point_query.out @@ -152,3 +152,11 @@ -- !sql -- 0 1111111 +-- !sql -- +10 20 aabc value + +-- !sql -- + +-- !sql -- +-10 20 aabc update val + diff --git a/regression-test/suites/point_query_p0/test_point_query.groovy b/regression-test/suites/point_query_p0/test_point_query.groovy index f27c366efbbd0b..f05d0af1305a68 100644 --- a/regression-test/suites/point_query_p0/test_point_query.groovy +++ b/regression-test/suites/point_query_p0/test_point_query.groovy @@ -304,4 +304,31 @@ suite("test_point_query", "nonConcurrent") { sql """set global enable_nereids_planner=true""" sql "set global enable_fallback_to_original_planner = true" } + + // test partial update/delete + sql "DROP TABLE IF EXISTS table_3821461" + sql """ + CREATE TABLE `table_3821461` ( + `col1` smallint NOT NULL, + `col2` int NOT NULL, + `loc3` char(10) NOT NULL, + `value` char(10) NOT NULL, + INDEX col3 (`loc3`) USING INVERTED, + INDEX col2 (`col2`) USING INVERTED ) + ENGINE=OLAP UNIQUE KEY(`col1`, `col2`, `loc3`) + DISTRIBUTED BY HASH(`col1`, `col2`, `loc3`) BUCKETS 1 + PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "bloom_filter_columns" = "col1", "store_row_column" = "true" ); + """ + sql "insert into table_3821461 values (-10, 20, 'aabc', 'value')" + sql "insert into table_3821461 values (10, 20, 'aabc', 'value');" + sql "insert into table_3821461 values (20, 30, 'aabc', 'value');" + explain { + sql("select * from table_3821461 where col1 = -10 and col2 = 20 and loc3 = 'aabc'") + contains "SHORT-CIRCUIT" + } + qt_sql "select * from table_3821461 where col1 = 10 and col2 = 20 and loc3 = 'aabc';" + sql "delete from table_3821461 where col1 = 10 and col2 = 20 and loc3 = 'aabc';" + qt_sql "select * from table_3821461 where col1 = 10 and col2 = 20 and loc3 = 'aabc';" + sql "update table_3821461 set value = 'update value' where col1 = -10 or col1 = 20;" + qt_sql """select * from table_3821461 where col1 = -10 and col2 = 20 and loc3 = 'aabc'""" } \ No newline at end of file From d177d5b004876c11a06fa2975794286436ec9e22 Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Wed, 4 Sep 2024 21:27:17 +0800 Subject: [PATCH 08/14] [chore](cloud) Support starting both meta-service and recycler within single process (#40223) e.g. the following will start meta-service and recycler within single process. ``` ./bin/start.sh --daemon ``` the log file will be meta_service.INFO* and it is the same effect as `./bin/start.sh --meta-service --recycler --daemon` doc PR https://github.com/apache/doris-website/pull/1073 --- cloud/script/start.sh | 27 +++++++---- cloud/src/common/config.h | 1 + cloud/src/main.cpp | 60 ++++++++++++++----------- cloud/src/recycler/checker.cpp | 2 + cloud/src/recycler/recycler.cpp | 5 +++ cloud/src/recycler/recycler_service.cpp | 2 +- cloud/test/recycler_test.cpp | 1 + 7 files changed, 63 insertions(+), 35 deletions(-) diff --git a/cloud/script/start.sh b/cloud/script/start.sh index 28e986166ae832..582c80c2e6fa4c 100644 --- a/cloud/script/start.sh +++ b/cloud/script/start.sh @@ -122,7 +122,10 @@ fi echo "LIBHDFS3_CONF=${LIBHDFS3_CONF}" -export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:5000,dirty_decay_ms:5000,oversize_threshold:0,prof:false,lg_prof_interval:-1" +# to enable dump jeprof heap stats prodigally, change `prof:false` to `prof:true` +# to control the dump interval change `lg_prof_interval` to a specific value, it is pow/exponent of 2 in size of bytes, default 34 means 2 ** 34 = 16GB +# to control the dump path, change `prof_prefix` to a specific path, e.g. /doris_cloud/log/ms_, by default it dumps at the path where the start command called +export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:5000,dirty_decay_ms:5000,oversize_threshold:0,prof_prefix:ms_,prof:false,lg_prof_interval:34" if [[ "${RUN_VERSION}" -eq 1 ]]; then "${bin}" --version @@ -131,14 +134,22 @@ fi mkdir -p "${DORIS_HOME}/log" echo "starts ${process} with args: $*" +out_file=${DORIS_HOME}/log/${process}.out if [[ "${RUN_DAEMON}" -eq 1 ]]; then - date >>"${DORIS_HOME}/log/${process}.out" - nohup "${bin}" "$@" >>"${DORIS_HOME}/log/${process}.out" 2>&1 & - # wait for log flush - sleep 1.5 - tail -n10 "${DORIS_HOME}/log/${process}.out" | grep 'working directory' -B1 -A10 - echo "please check process log for more details" - echo "" + # append 10 blank lines to ensure the following tail -n10 works correctly + printf "\n\n\n\n\n\n\n\n\n\n" >>"${out_file}" + echo "$(date +'%F %T') try to start ${process}" >>"${out_file}" + nohup "${bin}" "$@" >>"${out_file}" 2>&1 & + echo "wait and check ${process} start successfully" + sleep 3 + tail -n10 "${out_file}" | grep 'successfully started brpc' + ret=$? + if [[ ${ret} -ne 0 ]]; then + echo "${process} may not start successfully please check process log for more details" + exit 1 + fi + echo "${process} start successfully" + exit 0 elif [[ "${RUN_CONSOLE}" -eq 1 ]]; then export DORIS_LOG_TO_STDERR=1 date diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index e31a60a0d69f80..b1db41a6eb734e 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -77,6 +77,7 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min +CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60"); // log a warning if a recycle task takes longer than this duration CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h diff --git a/cloud/src/main.cpp b/cloud/src/main.cpp index 9356a3546d03a9..74e6a8daaf161c 100644 --- a/cloud/src/main.cpp +++ b/cloud/src/main.cpp @@ -161,13 +161,13 @@ DECLARE_int64(socket_max_unwritten_bytes); int main(int argc, char** argv) { if (argc > 1) { if (auto ret = args.parse(argc - 1, argv + 1); !ret.empty()) { - std::cerr << ret << std::endl; + std::cerr << "parse arguments error: " << ret << std::endl; help(); return -1; } } - if (argc < 2 || args.get(ARG_HELP)) { + if (args.get(ARG_HELP)) { help(); return 0; } @@ -177,21 +177,16 @@ int main(int argc, char** argv) { return 0; } - // FIXME(gavin): do we need to enable running both MS and recycler within - // single process - if (!(args.get(ARG_META_SERVICE) ^ args.get(ARG_RECYCLER))) { - std::cerr << "only one of --meta-service and --recycler must be specified" << std::endl; - return 1; - } - - // There may be more roles to play + // There may be more roles to play in the future, if there are multi roles specified, + // use meta_service as the process name std::string process_name = args.get(ARG_META_SERVICE) ? "meta_service" : args.get(ARG_RECYCLER) ? "recycler" - : ""; - if (process_name.empty()) { - std::cerr << "failed to determine prcess name with given args" << std::endl; - return 1; - } + : "meta_service"; + + using namespace std::chrono; + + auto start = steady_clock::now(); + auto end = start; auto pid_file_fd_holder = gen_pidfile("doris_cloud"); if (pid_file_fd_holder == nullptr) { @@ -215,11 +210,19 @@ int main(int argc, char** argv) { } // We can invoke glog from now on - std::string msg; + LOG(INFO) << "try to start doris_cloud"; LOG(INFO) << build_info(); std::cout << build_info() << std::endl; + if (!args.get(ARG_META_SERVICE) && !args.get(ARG_RECYCLER)) { + std::get<0>(args.args()[ARG_META_SERVICE]) = true; + std::get<0>(args.args()[ARG_RECYCLER]) = true; + LOG(INFO) << "meta_service and recycler are both not specified, " + "run doris_cloud as meta_service and recycler by default"; + std::cout << "run doris_cloud as meta_service and recycler by default" << std::endl; + } + brpc::Server server; brpc::FLAGS_max_body_size = config::brpc_max_body_size; brpc::FLAGS_socket_max_unwritten_bytes = config::brpc_socket_max_unwritten_bytes; @@ -238,19 +241,22 @@ int main(int argc, char** argv) { return 1; } LOG(INFO) << "begin to init txn kv"; + auto start_init_kv = steady_clock::now(); int ret = txn_kv->init(); if (ret != 0) { LOG(WARNING) << "failed to init txnkv, ret=" << ret; return 1; } - LOG(INFO) << "successfully init txn kv"; + end = steady_clock::now(); + LOG(INFO) << "successfully init txn kv, elapsed milliseconds: " + << duration_cast(end - start_init_kv).count(); if (init_global_encryption_key_info_map(txn_kv.get()) != 0) { LOG(WARNING) << "failed to init global encryption key map"; return -1; } - std::unique_ptr meta_server; + std::unique_ptr meta_server; // meta-service std::unique_ptr recycler; std::thread periodiccally_log_thread; std::mutex periodiccally_log_thread_lock; @@ -269,7 +275,8 @@ int main(int argc, char** argv) { msg = "meta-service started"; LOG(INFO) << msg; std::cout << msg << std::endl; - } else if (args.get(ARG_RECYCLER)) { + } + if (args.get(ARG_RECYCLER)) { recycler = std::make_unique(txn_kv); int ret = recycler->start(&server); if (ret != 0) { @@ -284,15 +291,12 @@ int main(int argc, char** argv) { auto periodiccally_log = [&]() { while (periodiccally_log_thread_run) { std::unique_lock lck {periodiccally_log_thread_lock}; - periodiccally_log_thread_cv.wait_for( - lck, std::chrono::milliseconds(config::periodically_log_ms)); + periodiccally_log_thread_cv.wait_for(lck, + milliseconds(config::periodically_log_ms)); LOG(INFO) << "Periodically log for recycler"; } }; periodiccally_log_thread = std::thread {periodiccally_log}; - } else { - std::cerr << "cloud starts without doing anything and exits" << std::endl; - return -1; } // start service brpc::ServerOptions options; @@ -309,7 +313,11 @@ int main(int argc, char** argv) { << ", errmsg=" << strerror_r(errno, buf, 64) << ", port=" << port; return -1; } - LOG(INFO) << "successfully started brpc listening on port=" << port; + end = steady_clock::now(); + msg = "successfully started brpc listening on port=" + std::to_string(port) + + " time_elapsed_ms=" + std::to_string(duration_cast(end - start).count()); + LOG(INFO) << msg; + std::cout << msg << std::endl; server.RunUntilAskedToQuit(); // Wait for signals server.ClearServices(); @@ -326,7 +334,7 @@ int main(int argc, char** argv) { periodiccally_log_thread_run = false; // immediately notify the log thread to quickly exit in case it block the // whole procedure - periodiccally_log_thread_cv.notify_one(); + periodiccally_log_thread_cv.notify_all(); } periodiccally_log_thread.join(); } diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp index 49421f97ca0aeb..c3e9f69ed9d6bc 100644 --- a/cloud/src/recycler/checker.cpp +++ b/cloud/src/recycler/checker.cpp @@ -79,6 +79,8 @@ int Checker::start() { // launch instance scanner auto scanner_func = [this]() { + std::this_thread::sleep_for( + std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds)); while (!stopped()) { std::vector instances; get_all_instances(txn_kv_.get(), instances); diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 9db16a18c13d4c..76d4a7ca767a55 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -189,6 +189,11 @@ Recycler::~Recycler() { } void Recycler::instance_scanner_callback() { + // sleep 60 seconds before scheduling for the launch procedure to complete: + // some bad hdfs connection may cause some log to stdout stderr + // which may pollute .out file and affect the script to check success + std::this_thread::sleep_for( + std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds)); while (!stopped()) { std::vector instances; get_all_instances(txn_kv_.get(), instances); diff --git a/cloud/src/recycler/recycler_service.cpp b/cloud/src/recycler/recycler_service.cpp index 3c1a5b2ab48d95..08e937a4106ba5 100644 --- a/cloud/src/recycler/recycler_service.cpp +++ b/cloud/src/recycler/recycler_service.cpp @@ -448,7 +448,7 @@ void RecyclerServiceImpl::http(::google::protobuf::RpcController* controller, } status_code = 404; - msg = "not found"; + msg = "http path " + uri.path() + " not found, it may be not implemented"; response_body = msg; } diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index d767c1bd8b7393..146873548398bf 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -64,6 +64,7 @@ int main(int argc, char** argv) { using namespace std::chrono; current_time = duration_cast(system_clock::now().time_since_epoch()).count(); + config::recycler_sleep_before_scheduling_seconds = 0; // we dont have to wait in UT ::testing::InitGoogleTest(&argc, argv); auto s3_producer_pool = std::make_shared(config::recycle_pool_parallelism); From 553de2d83abb1b990256a09d1d81e63fbe5107b7 Mon Sep 17 00:00:00 2001 From: Gavin Chou Date: Wed, 4 Sep 2024 21:33:57 +0800 Subject: [PATCH 09/14] [fix](cloud) Fix missing privilege to storage vault after restarting FE (#40260) The previous implement forgets to build `storageVaultPrivTable` (in-memory) after loading auth information from image, which means the privileges are persisted but unable to use after restarting FE. Note: a new image will be generated after FE restarts or a checkpoint is triggered, the newly generated will be used when FE starts. --- .../apache/doris/mysql/privilege/Role.java | 19 +- .../doris/regression/suite/Suite.groovy | 13 +- .../suites/vaults/privilege_restart.groovy | 178 ++++++++++++++++++ 3 files changed, 198 insertions(+), 12 deletions(-) create mode 100644 regression-test/suites/vaults/privilege_restart.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java index 8354c655e2a7f3..0054579062fcc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java @@ -662,14 +662,15 @@ private void grantPrivs(ResourcePattern resourcePattern, PrivBitSet privs) throw break; case CLUSTER: cloudClusterPrivTable.addEntry(entry, false, false); - LOG.info("cloud cluster add list {}", cloudClusterPrivTable); + LOG.info("cloud cluster priv table after add {}", cloudClusterPrivTable); break; case STAGE: cloudStagePrivTable.addEntry(entry, false, false); - LOG.info("cloud stage add list {}", cloudStagePrivTable); + LOG.info("cloud stage priv table after add {}", cloudStagePrivTable); break; case STORAGE_VAULT: storageVaultPrivTable.addEntry(entry, false, false); + LOG.info("cloud storage vault priv table after add {}", storageVaultPrivTable); break; default: throw new DdlException("Unknown resource type: " + resourcePattern.getResourceType() + " name=" @@ -1166,18 +1167,26 @@ private void rebuildPrivTables() { workloadGroupPrivTable = new WorkloadGroupPrivTable(); cloudClusterPrivTable = new ResourcePrivTable(); cloudStagePrivTable = new ResourcePrivTable(); + storageVaultPrivTable = new ResourcePrivTable(); for (Entry entry : tblPatternToPrivs.entrySet()) { try { grantPrivs(entry.getKey(), entry.getValue().copy()); } catch (DdlException e) { - LOG.warn("grant failed,", e); + LOG.warn("grant tblPatternToPrivs failed,", e); } } for (Entry entry : resourcePatternToPrivs.entrySet()) { try { grantPrivs(entry.getKey(), entry.getValue().copy()); } catch (DdlException e) { - LOG.warn("grant failed,", e); + LOG.warn("grant resourcePatternToPrivs failed,", e); + } + } + for (Entry entry : storageVaultPatternToPrivs.entrySet()) { + try { + grantPrivs(entry.getKey(), entry.getValue().copy()); + } catch (DdlException e) { + LOG.warn("grant storageVaultPatternToPrivs failed,", e); } } for (Entry entry : clusterPatternToPrivs.entrySet()) { @@ -1204,7 +1213,7 @@ private void rebuildPrivTables() { try { grantPrivs(entry.getKey(), entry.getValue().copy()); } catch (DdlException e) { - LOG.warn("grant failed,", e); + LOG.warn("grant workloadGroupPatternToPrivs failed,", e); } } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index f5e6e5ab1bab71..1587363dc0cb9d 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1394,11 +1394,11 @@ class Suite implements GroovyInterceptable { } boolean enableStoragevault() { + boolean ret = false; if (context.config.metaServiceHttpAddress == null || context.config.metaServiceHttpAddress.isEmpty() || - context.config.metaServiceHttpAddress == null || context.config.metaServiceHttpAddress.isEmpty() || - context.config.instanceId == null || context.config.instanceId.isEmpty() || - context.config.metaServiceToken == null || context.config.metaServiceToken.isEmpty()) { - return false; + context.config.instanceId == null || context.config.instanceId.isEmpty() || + context.config.metaServiceToken == null || context.config.metaServiceToken.isEmpty()) { + return ret; } def getInstanceInfo = { check_func -> httpTest { @@ -1408,7 +1408,6 @@ class Suite implements GroovyInterceptable { check check_func } } - boolean enableStorageVault = false; getInstanceInfo.call() { respCode, body -> String respCodeValue = "${respCode}".toString(); @@ -1417,10 +1416,10 @@ class Suite implements GroovyInterceptable { } def json = parseJson(body) if (json.result.containsKey("enable_storage_vault") && json.result.enable_storage_vault) { - enableStorageVault = true; + ret = true; } } - return enableStorageVault; + return ret; } boolean isGroupCommitMode() { diff --git a/regression-test/suites/vaults/privilege_restart.groovy b/regression-test/suites/vaults/privilege_restart.groovy new file mode 100644 index 00000000000000..4e8c8fcc04dade --- /dev/null +++ b/regression-test/suites/vaults/privilege_restart.groovy @@ -0,0 +1,178 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.stream.Collectors; + +// This test suite is intent to test the granted privilege for specific user will +// not disappear +suite("test_privilege_vault_restart", "nonConcurrent") { + if (!enableStoragevault()) { + logger.info("skip test_privilege_vault_restart case") + return + } + + // user1 will be kept before and after running this test in order to check + // the granted vault privilege is persisted well eventhough FE restarts many times + def user1 = "test_privilege_vault_restart_user1" + def passwd = "Cloud12345" + + def vault1 = "test_privilege_vault_restart_vault1" + // this vaule is derived from current file location: regression-test/vaults + def db = "regression_test_vaults" + def table1 = "test_privilege_vault_restart_t1" + def table2 = "test_privilege_vault_restart_t2" + def hdfsLinkWeDontReallyCare = "127.0.0.1:10086" // a dummy link, it doesn't need to work + + //========================================================================== + // prepare the basic vault and tables for further check + //========================================================================== + sql """ + CREATE STORAGE VAULT IF NOT EXISTS ${vault1} + PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="${hdfsLinkWeDontReallyCare}", + "path_prefix" = "test_vault_privilege_restart" + ); + """ + + def storageVaults = (sql " SHOW STORAGE VAULT; ").stream().map(row -> row[0]).collect(Collectors.toSet()) + logger.info("all vaults: ${storageVaults}") + org.junit.Assert.assertTrue("${vault1} is not present after creating, all vaults: ${storageVaults}", storageVaults.contains(vault1)) + + def allTables = (sql " SHOW tables").stream().map(row -> row[0]).collect(Collectors.toSet()) + logger.info("all tables ${allTables}") + + // table1 is the sign to check if the user1 has been created and granted well + def targetTableExist = allTables.contains(table1) + + if (targetTableExist) { + // the grant procedure at least run once before, user1 has been granted vault1 + logger.info("${user1} has been granted with usage_priv to ${vault1} before") + } else { + logger.info("this is the frist run, or there was a crash during the very first run, ${user1} has not been granted with usage_priv to ${vault1} before") + // create user and grant storage vault and create a table with that vault + sql """drop user if exists ${user1}""" + sql """create user ${user1} identified by '${passwd}'""" + sql """ + GRANT usage_priv ON storage vault ${vault1} TO '${user1}'; + """ + sql """ + GRANT create_priv ON *.*.* TO '${user1}'; + """ + + // ATTN: create table1, if successful, the sign has been set + // there wont be any execuse that user1 misses the privilege to vault1 from now on + sql """ + CREATE TABLE IF NOT EXISTS ${table1} ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME INTEGER NOT NULL + ) + DUPLICATE KEY(C_CUSTKEY, C_NAME) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "storage_vault_name" = ${vault1} + ) + """ + } + + //========================================================================== + // check the prepared users and tables + //========================================================================== + def allUsers = (sql " SHOW all grants ").stream().map(row -> row[0]).collect(Collectors.toSet()) + logger.info("all users: ${allUsers}") + def userPresent = !(allUsers.stream().filter(i -> i.contains(user1)).collect(Collectors.toSet()).isEmpty()) + org.junit.Assert.assertTrue("${user1} is not in the priv table ${allUsers}", userPresent) + + allTables = (sql " SHOW tables").stream().map(row -> row[0]).collect(Collectors.toSet()) + logger.info("all tables: ${allTables}") + org.junit.Assert.assertTrue("${table1} is not present, all tables: ${allUsers}", allTables.contains(table1)) + + // Test user privilege, the newly created user cannot create or set default vault + // Only users with admin role can create storage vault + connect(user = user1, password = passwd, url = context.config.jdbcUrl) { + sql """use ${db}""" + expectExceptionLike({ + sql """ + CREATE STORAGE VAULT IF NOT EXISTS ${vault1} + PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="${hdfsLinkWeDontReallyCare}", + "path_prefix" = "test_vault_privilege" + ); + """ + }, "denied") + } + // Only users with admin role can set/unset default storage vault + connect(user = user1, password = passwd, url = context.config.jdbcUrl) { + sql """use ${db}""" + expectExceptionLike({ + sql """ + SET ${vault1} AS DEFAULT STORAGE VAULT + """ + }, "denied") + } + connect(user = user1, password = passwd, url = context.config.jdbcUrl) { + sql """use ${db}""" + expectExceptionLike({ + sql """ + UNSET DEFAULT STORAGE VAULT + """ + }, "denied") + } + + // user1 should see vault1 + def result = connect(user = user1, password = passwd, url = context.config.jdbcUrl) { + sql """use ${db}""" + sql " SHOW STORAGE VAULT; " + } + storageVaults = result.stream().map(row -> row[0]).collect(Collectors.toSet()) + org.junit.Assert.assertTrue("${user1} cannot see granted vault ${vault1} in result ${result}", storageVaults.contains(vault1)) + + + //========================================================================== + // to test that user1 has the privilege of vault1 to create new tables + // this is the main test for granted vault privilege after restarting FE + //========================================================================== + sql """ + DROP TABLE IF EXISTS ${table2} force; + """ + connect(user = user1, password = passwd, url = context.config.jdbcUrl) { + sql """use ${db}""" + sql """ + CREATE TABLE ${table2} ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME INTEGER NOT NULL + ) + DUPLICATE KEY(C_CUSTKEY, C_NAME) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "storage_vault_name" = ${vault1} + ) + """ + } + + result = connect(user = user1, password = passwd, url = context.config.jdbcUrl) { + sql """use ${db}""" + sql " SHOW create table ${table2}; " + } + logger.info("show create table ${table2}, result ${result}") + org.junit.Assert.assertTrue("missing storage vault properties ${vault1} in table ${table2}", result.toString().contains(vault1)) + +} From 9f46335476edb3c246d5bf60485cdb348033a618 Mon Sep 17 00:00:00 2001 From: Socrates Date: Wed, 4 Sep 2024 22:42:54 +0800 Subject: [PATCH 10/14] [fix](decompressor) consider the large_block_len is 0 (#40183) followup #38549 If the large_block_len is 0, should not continue reading the block_len. --- be/src/exec/decompressor.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/be/src/exec/decompressor.cpp b/be/src/exec/decompressor.cpp index 9102765458bdf1..a830eb3cbe70b5 100644 --- a/be/src/exec/decompressor.cpp +++ b/be/src/exec/decompressor.cpp @@ -468,7 +468,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t } std::size_t decompressed_large_block_len = 0; - do { + while (remaining_decompressed_large_block_len > 0) { // Check that input length should not be negative. if (input_len < sizeof(uint32_t)) { *more_input_bytes = sizeof(uint32_t) - input_len; @@ -505,8 +505,7 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t output_ptr += decompressed_small_block_len; remaining_decompressed_large_block_len -= decompressed_small_block_len; decompressed_large_block_len += decompressed_small_block_len; - - } while (remaining_decompressed_large_block_len > 0); + }; if (*more_input_bytes != 0) { // Need more input buffer @@ -586,7 +585,7 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, } std::size_t decompressed_large_block_len = 0; - do { + while (remaining_decompressed_large_block_len > 0) { // Check that input length should not be negative. if (input_len < sizeof(uint32_t)) { *more_input_bytes = sizeof(uint32_t) - input_len; @@ -630,8 +629,7 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len, output_ptr += decompressed_small_block_len; remaining_decompressed_large_block_len -= decompressed_small_block_len; decompressed_large_block_len += decompressed_small_block_len; - - } while (remaining_decompressed_large_block_len > 0); + }; if (*more_input_bytes != 0) { // Need more input buffer From 301bd04fc9adfceb2d7e391d63c5ff6347ad8472 Mon Sep 17 00:00:00 2001 From: Dongyang Li Date: Wed, 4 Sep 2024 23:48:18 +0800 Subject: [PATCH 11/14] [chore](ci) tmp remove required of Clang Formatter (#40392) For some unknown reason, all these checks failed. --- .asf.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.asf.yaml b/.asf.yaml index e71e55de23fc83..6ff16967c2e415 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -49,7 +49,6 @@ github: strict: false contexts: - License Check - - Clang Formatter - CheckStyle - P0 Regression (Doris Regression) - External Regression (Doris External Regression) @@ -87,7 +86,6 @@ github: strict: false contexts: - License Check - - Clang Formatter - CheckStyle - Build Broker - ShellCheck @@ -109,7 +107,6 @@ github: strict: false contexts: - License Check - - Clang Formatter - CheckStyle - P0 Regression (Doris Regression) - External Regression (Doris External Regression) From ae78c52fc996e82d73b76d5fc99939c9d56f1edc Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 4 Sep 2024 23:55:02 +0800 Subject: [PATCH 12/14] [fix](ga) try fix clang format action (#40393) Because distutils was removed in Python 3.12, we get ModuleNotFoundError: module named 'distutils' Use 3.10 --- .github/workflows/clang-format.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/clang-format.yml b/.github/workflows/clang-format.yml index ef39fa6c45fe11..f49f1987b9ed41 100644 --- a/.github/workflows/clang-format.yml +++ b/.github/workflows/clang-format.yml @@ -67,6 +67,11 @@ jobs: git checkout 6adbe14579e5b8e19eb3e31e5ff2479f3bd302c7 popd &>/dev/null + - name: Install Python dependencies + uses: actions/setup-python@v5 + with: + python-version: '3.10' # Adjust if needed + - name: "Format it!" if: ${{ steps.filter.outputs.changes == 'true' }} uses: ./.github/actions/clang-format-lint-action From 9faecb90a278c683ce9fc8c094d8f256de1a1387 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:10:47 +0800 Subject: [PATCH 13/14] [enhancement](show) Move storage policy info from show create table to show partitions (#39923) --- .../doris/catalog/ListPartitionInfo.java | 8 -- .../doris/catalog/RangePartitionInfo.java | 8 -- ...ow_create_table_with_storage_policy.groovy | 97 +++++++++++++++++++ 3 files changed, 97 insertions(+), 16 deletions(-) create mode 100644 regression-test/suites/show_p0/test_show_create_table_with_storage_policy.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java index da48ac980bf76a..4c5b38d682c940 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java @@ -30,7 +30,6 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.ListUtil; -import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Preconditions; @@ -233,13 +232,6 @@ public String toSql(OlapTable table, List partitionId) { } sb.append(")"); - Optional.ofNullable(this.idToStoragePolicy.get(entry.getKey())).ifPresent(p -> { - if (!p.equals("")) { - sb.append(" (\"" + PropertyAnalyzer.PROPERTIES_STORAGE_POLICY + "\" = \""); - sb.append(p).append("\")"); - } - }); - if (partitionId != null) { partitionId.add(entry.getKey()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java index 87e1d5d19b438b..6cf5545aacfc87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java @@ -27,7 +27,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.persist.gson.GsonUtils; @@ -283,13 +282,6 @@ public String toSql(OlapTable table, List partitionId) { sb.append(range.lowerEndpoint().toSql()); sb.append(", ").append(range.upperEndpoint().toSql()).append(")"); - Optional.ofNullable(this.idToStoragePolicy.get(entry.getKey())).ifPresent(p -> { - if (!p.equals("")) { - sb.append(" (\"" + PropertyAnalyzer.PROPERTIES_STORAGE_POLICY + "\" = \""); - sb.append(p).append("\")"); - } - }); - if (partitionId != null) { partitionId.add(entry.getKey()); break; diff --git a/regression-test/suites/show_p0/test_show_create_table_with_storage_policy.groovy b/regression-test/suites/show_p0/test_show_create_table_with_storage_policy.groovy new file mode 100644 index 00000000000000..1a276f09e6f906 --- /dev/null +++ b/regression-test/suites/show_p0/test_show_create_table_with_storage_policy.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_show_create_table_with_storage_policy") { + def tableName = "test_show_create_table_with_storage_policy" + def resourceName = "remote_hdfs" + def storagePolicyName = "ods_ods_invalid_events" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE RESOURCE "${resourceName}" PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="fs_host:default_fs_port", + "hadoop.username"="hive", + "hadoop.password"="hive", + "dfs.nameservices" = "my_ha", + "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + """ + + sql """ + CREATE STORAGE POLICY ${storagePolicyName} PROPERTIES ( + "storage_resource" = "${resourceName}", + "cooldown_ttl" = "300" + ); + """ + + sql """ + CREATE TABLE ${tableName} ( + `insert_date` date NULL COMMENT '对应ts_pretty,取yyyy-MM-dd', + `err_type` INT NULL COMMENT '错误类型', + `line` TEXT NULL COMMENT '数据列', + `received_at` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '接收到数据的事件' + ) ENGINE=OLAP + DUPLICATE KEY(`insert_date`, `err_type`) + COMMENT 'OLAP' + PARTITION BY RANGE(`insert_date`) + (PARTITION p20240806 VALUES [('2024-08-06'), ('2024-08-07')), + PARTITION p20240807 VALUES [('2024-08-07'), ('2024-08-08')), + PARTITION p20240808 VALUES [('2024-08-08'), ('2024-08-09')), + PARTITION p20240809 VALUES [('2024-08-09'), ('2024-08-10')), + PARTITION p20240810 VALUES [('2024-08-10'), ('2024-08-11')), + PARTITION p20240811 VALUES [('2024-08-11'), ('2024-08-12')), + PARTITION p20240812 VALUES [('2024-08-12'), ('2024-08-13')), + PARTITION p20240821 VALUES [('2024-08-21'), ('2024-08-22'))) + DISTRIBUTED BY HASH(`insert_date`, `err_type`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "is_being_synced" = "false", + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.time_zone" = "Europe/London", + "dynamic_partition.start" = "-30", + "dynamic_partition.end" = "7", + "dynamic_partition.prefix" = "p", + "dynamic_partition.replication_allocation" = "tag.location.default: 1", + "dynamic_partition.buckets" = "2", + "dynamic_partition.create_history_partition" = "true", + "dynamic_partition.history_partition_num" = "8", + "dynamic_partition.hot_partition_num" = "0", + "dynamic_partition.reserved_history_periods" = "NULL", + "dynamic_partition.storage_policy" = "${storagePolicyName}", + "storage_format" = "V2", + "light_schema_change" = "true", + "disable_auto_compaction" = "false", + "enable_single_replica_compaction" = "false" + ); + """ + + def ret = sql """ SHOW CREATE TABLE ${tableName} """ + String createSql = ret[0][1] + + ret = sql """ SHOW PARTITIONS FROM ${tableName} """ + assertEquals(ret[9][12], storagePolicyName) + + sql """ DROP TABLE IF EXISTS ${tableName} """ + // create table successfully with stmt from show create table + sql createSql +} From 1849425cddf5eadb2f3b9ffbd41768472b6b3555 Mon Sep 17 00:00:00 2001 From: walter Date: Thu, 5 Sep 2024 10:35:41 +0800 Subject: [PATCH 14/14] [fix](restore) update is_being_synced properties (#40194) Some properties, like storage_policy, are not supported by the ccr syncer. This PR maintains those properties for both local and remote tables while restoring. --- .../org/apache/doris/backup/RestoreJob.java | 9 +++++--- .../org/apache/doris/catalog/OlapTable.java | 21 +++++++++++++------ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 12a30a97277995..44ccdd44f74c4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1812,7 +1812,7 @@ private Status allTabletCommitted(boolean isReplay) { // set all restored partition version and version hash // set all tables' state to NORMAL - setTableStateToNormal(db, true, isReplay); + setTableStateToNormalAndUpdateProperties(db, true, isReplay); for (long tblId : restoredVersionInfo.rowKeySet()) { Table tbl = db.getTableNullable(tblId); if (tbl == null) { @@ -2055,7 +2055,7 @@ public void cancelInternal(boolean isReplay) { Database db = env.getInternalCatalog().getDbNullable(dbId); if (db != null) { // rollback table's state to NORMAL - setTableStateToNormal(db, false, isReplay); + setTableStateToNormalAndUpdateProperties(db, false, isReplay); // remove restored tbls for (Table restoreTbl : restoredTbls) { @@ -2135,7 +2135,7 @@ public void cancelInternal(boolean isReplay) { LOG.info("finished to cancel restore job. is replay: {}. {}", isReplay, this); } - private void setTableStateToNormal(Database db, boolean committed, boolean isReplay) { + private void setTableStateToNormalAndUpdateProperties(Database db, boolean committed, boolean isReplay) { for (String tableName : jobInfo.backupOlapTableObjects.keySet()) { Table tbl = db.getTableNullable(jobInfo.getAliasByOriginNameIfSet(tableName)); if (tbl == null) { @@ -2178,6 +2178,9 @@ private void setTableStateToNormal(Database db, boolean committed, boolean isRep DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime()); } } + if (committed && isBeingSynced) { + olapTbl.setBeingSyncedProperties(); + } } finally { tbl.writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 533c24daa0e7bc..4bc05aff20612e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -737,17 +737,26 @@ public void resetPropertiesForRestore(boolean reserveDynamicPartitionEnable, boo tableProperty.resetPropertiesForRestore(reserveDynamicPartitionEnable, reserveReplica, replicaAlloc); } if (isBeingSynced) { - TableProperty tableProperty = getOrCreatTableProperty(); - tableProperty.setIsBeingSynced(); - tableProperty.removeInvalidProperties(); - if (isAutoBucket()) { - markAutoBucket(); - } + setBeingSyncedProperties(); } // remove colocate property. setColocateGroup(null); } + /** + * Set the related properties when is_being_synced properties is true. + * + * Some properties, like storage_policy, colocate_with, are not supported by the ccr syncer. + */ + public void setBeingSyncedProperties() { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.setIsBeingSynced(); + tableProperty.removeInvalidProperties(); + if (isAutoBucket()) { + markAutoBucket(); + } + } + public void resetVersionForRestore() { for (Partition partition : idToPartition.values()) { partition.setNextVersion(partition.getVisibleVersion() + 1);