diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 3abeecbfe2824a..05561be2971305 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -32,9 +32,11 @@ #include "common/exception.h" #include "common/status.h" #include "olap/tablet_schema.h" +#include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/large_int_value.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/primitive_type.h" #include "runtime/raw_value.h" #include "runtime/types.h" #include "util/hash_util.hpp" @@ -128,30 +130,33 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { for (auto& col : pschema.partial_update_input_columns()) { _partial_update_input_columns.insert(col); } - std::map slots_map; + std::unordered_map, SlotDescriptor*> slots_map; _tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc())); for (auto& p_slot_desc : pschema.slot_descs()) { auto slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc)); _tuple_desc->add_slot(slot_desc); - slots_map.emplace(slot_desc->col_name(), slot_desc); + string data_type; + EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type); + slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), std::move(data_type)), + slot_desc); } for (auto& p_index : pschema.indexes()) { auto index = _obj_pool.add(new OlapTableIndexSchema()); index->index_id = p_index.id(); index->schema_hash = p_index.schema_hash(); - for (auto& col : p_index.columns()) { - if (_is_partial_update && _partial_update_input_columns.count(col) == 0) { - continue; - } - auto it = slots_map.find(col); - if (it == std::end(slots_map)) { - return Status::InternalError("unknown index column, column={}", col); - } - index->slots.emplace_back(it->second); - } for (auto& pcolumn_desc : p_index.columns_desc()) { + if (!_is_partial_update || + _partial_update_input_columns.count(pcolumn_desc.name()) > 0) { + auto it = slots_map.find( + std::make_pair(to_lower(pcolumn_desc.name()), pcolumn_desc.type())); + if (it == std::end(slots_map)) { + return Status::InternalError("unknown index column, column={}, type={}", + pcolumn_desc.name(), pcolumn_desc.type()); + } + index->slots.emplace_back(it->second); + } TabletColumn* tc = _obj_pool.add(new TabletColumn()); tc->init_from_pb(pcolumn_desc); index->columns.emplace_back(tc); @@ -183,41 +188,43 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { for (auto& tcolumn : tschema.partial_update_input_columns) { _partial_update_input_columns.insert(tcolumn); } - std::map slots_map; + std::unordered_map, SlotDescriptor*> slots_map; _tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc)); for (auto& t_slot_desc : tschema.slot_descs) { auto slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc)); _tuple_desc->add_slot(slot_desc); - slots_map.emplace(to_lower(slot_desc->col_name()), slot_desc); + slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), slot_desc->col_type()), + slot_desc); } for (auto& t_index : tschema.indexes) { + std::unordered_map index_slots_map; auto index = _obj_pool.add(new OlapTableIndexSchema()); index->index_id = t_index.id; index->schema_hash = t_index.schema_hash; - for (auto& col : t_index.columns) { - if (_is_partial_update && _partial_update_input_columns.count(col) == 0) { - continue; - } - auto it = slots_map.find(to_lower(col)); - if (it == std::end(slots_map)) { - return Status::InternalError("unknown index column, column={}", col); - } - index->slots.emplace_back(it->second); - } - if (t_index.__isset.columns_desc) { - for (auto& tcolumn_desc : t_index.columns_desc) { - TabletColumn* tc = _obj_pool.add(new TabletColumn()); - tc->init_from_thrift(tcolumn_desc); - index->columns.emplace_back(tc); + for (auto& tcolumn_desc : t_index.columns_desc) { + auto it = slots_map.find(std::make_pair(to_lower(tcolumn_desc.column_name), + thrift_to_type(tcolumn_desc.column_type.type))); + if (!_is_partial_update || + _partial_update_input_columns.count(tcolumn_desc.column_name) > 0) { + if (it == slots_map.end()) { + return Status::InternalError("unknown index column, column={}, type={}", + tcolumn_desc.column_name, + tcolumn_desc.column_type.type); + } + index_slots_map.emplace(to_lower(tcolumn_desc.column_name), it->second); + index->slots.emplace_back(it->second); } + TabletColumn* tc = _obj_pool.add(new TabletColumn()); + tc->init_from_thrift(tcolumn_desc); + index->columns.emplace_back(tc); } if (t_index.__isset.indexes_desc) { for (auto& tindex_desc : t_index.indexes_desc) { std::vector column_unique_ids(tindex_desc.columns.size()); for (size_t i = 0; i < tindex_desc.columns.size(); i++) { - auto it = slots_map.find(to_lower(tindex_desc.columns[i])); - if (it != std::end(slots_map)) { + auto it = index_slots_map.find(to_lower(tindex_desc.columns[i])); + if (it != index_slots_map.end()) { column_unique_ids[i] = it->second->col_unique_id(); } } diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index c10074c47cd2b7..833ad15a15d3ff 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -799,6 +799,9 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version _indexes.clear(); _field_name_to_index.clear(); _field_id_to_index.clear(); + _delete_sign_idx = -1; + _sequence_col_idx = -1; + _version_col_idx = -1; for (auto& column : index->columns) { if (column->is_key()) { diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 42fabc6c2f64f4..2fe6ea45581582 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -143,6 +143,10 @@ class TabletColumn { int32_t _unique_id = -1; std::string _col_name; std::string _col_name_lower_case; + // the field _type will change from TPrimitiveType + // to string by 'EnumToString(TPrimitiveType, tcolumn.column_type.type, data_type);' (reference: TabletMeta::init_column_from_tcolumn) + // to FieldType by 'TabletColumn::get_field_type_by_string' (reference: TabletColumn::init_from_pb). + // And the _type in columnPB is string and it changed from FieldType by 'get_string_by_field_type' (reference: TabletColumn::to_schema_pb). FieldType _type; bool _is_key = false; FieldAggregationMethod _aggregation; diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index add28582d894b6..b5c23a2afd0a06 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -31,6 +31,7 @@ #include #include "common/object_pool.h" +#include "runtime/primitive_type.h" #include "util/string_util.h" #include "vec/aggregate_functions/aggregate_function.h" #include "vec/data_types/data_type_factory.hpp" @@ -59,6 +60,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc) _col_name(tdesc.colName), _col_name_lower_case(to_lower(tdesc.colName)), _col_unique_id(tdesc.col_unique_id), + _col_type(thrift_to_type(tdesc.primitive_type)), _slot_idx(tdesc.slotIdx), _field_idx(-1), _is_materialized(tdesc.isMaterialized), @@ -77,6 +79,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc) _col_name(pdesc.col_name()), _col_name_lower_case(to_lower(pdesc.col_name())), _col_unique_id(pdesc.col_unique_id()), + _col_type(static_cast(pdesc.col_type())), _slot_idx(pdesc.slot_idx()), _field_idx(-1), _is_materialized(pdesc.is_materialized()), @@ -99,6 +102,7 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const { pslot->set_col_unique_id(_col_unique_id); pslot->set_is_key(_is_key); pslot->set_is_auto_increment(_is_auto_increment); + pslot->set_col_type(_col_type); } vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 5483ecde52dec3..f77f5fec3b99be 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -36,6 +36,7 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/global_types.h" #include "common/status.h" +#include "runtime/define_primitive_type.h" #include "runtime/types.h" #include "vec/data_types/data_type.h" @@ -113,6 +114,7 @@ class SlotDescriptor { bool is_auto_increment() const { return _is_auto_increment; } const std::string& col_default_value() const { return _col_default_value; } + PrimitiveType col_type() const { return _col_type; } private: friend class DescriptorTbl; @@ -132,6 +134,7 @@ class SlotDescriptor { const std::string _col_name_lower_case; const int32_t _col_unique_id; + const PrimitiveType _col_type; // the idx of the slot in the tuple descriptor (0-based). // this is provided by the FE diff --git a/be/src/runtime/primitive_type.cpp b/be/src/runtime/primitive_type.cpp index 82a189107d9301..56ac616014b4d0 100644 --- a/be/src/runtime/primitive_type.cpp +++ b/be/src/runtime/primitive_type.cpp @@ -154,6 +154,7 @@ PrimitiveType thrift_to_type(TPrimitiveType::type ttype) { case TPrimitiveType::VARIANT: return TYPE_VARIANT; + default: CHECK(false) << ", meet unknown type " << ttype; return INVALID_TYPE; @@ -259,6 +260,8 @@ TPrimitiveType::type to_thrift(PrimitiveType ptype) { return TPrimitiveType::STRUCT; case TYPE_LAMBDA_FUNCTION: return TPrimitiveType::LAMBDA_FUNCTION; + case TYPE_AGG_STATE: + return TPrimitiveType::AGG_STATE; default: return TPrimitiveType::INVALID_TYPE; @@ -365,6 +368,9 @@ std::string type_to_string(PrimitiveType t) { case TYPE_LAMBDA_FUNCTION: return "LAMBDA_FUNCTION TYPE"; + case TYPE_VARIANT: + return "VARIANT"; + default: return ""; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java index 42ba32aef41166..6384dad8d7b9d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java @@ -296,13 +296,14 @@ public boolean layoutEquals(SlotDescriptor other) { public TSlotDescriptor toThrift() { // Non-nullable slots will have 0 for the byte offset and -1 for the bit mask TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), type.toThrift(), -1, - byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getName() : ""), slotIdx, + byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getNonShadowName() : ""), slotIdx, isMaterialized); tSlotDescriptor.setNeedMaterialize(needMaterialize); tSlotDescriptor.setIsAutoIncrement(isAutoInc); if (column != null) { - LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId()); + LOG.debug("column name:{}, column unique id:{}", column.getNonShadowName(), column.getUniqueId()); tSlotDescriptor.setColUniqueId(column.getUniqueId()); + tSlotDescriptor.setPrimitiveType(column.getDataType().toThrift()); tSlotDescriptor.setIsKey(column.isKey()); tSlotDescriptor.setColDefaultValue(column.getDefaultValue()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 6de1b5a9d410d5..d9a0ed51badd99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -491,7 +491,7 @@ public int getOlapColumnIndexSize() { public TColumn toThrift() { TColumn tColumn = new TColumn(); - tColumn.setColumnName(this.name); + tColumn.setColumnName(removeNamePrefix(this.name)); TColumnType tColumnType = new TColumnType(); tColumnType.setType(this.getDataType().toThrift()); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 99101767644389..aeab7ace7c292d 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -37,6 +37,7 @@ message PSlotDescriptor { optional int32 col_unique_id = 11; optional bool is_key = 12; optional bool is_auto_increment = 13; + optional int32 col_type = 14 [default = 0]; }; message PTupleDescriptor { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index c5425ecfa3c717..21bea8cac59dae 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -63,6 +63,7 @@ struct TSlotDescriptor { // subcolumn path info list for semi structure column(variant) 15: optional list column_paths 16: optional string col_default_value + 17: optional Types.TPrimitiveType primitive_type = Types.TPrimitiveType.INVALID_TYPE } struct TTupleDescriptor { diff --git a/regression-test/suites/schema_change/ddl/lineorder_create.sql b/regression-test/suites/schema_change/ddl/lineorder_create.sql new file mode 100644 index 00000000000000..442262821981da --- /dev/null +++ b/regression-test/suites/schema_change/ddl/lineorder_create.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS `lineorder` ( + `lo_orderkey` bigint(20) NOT NULL COMMENT "", + `lo_linenumber` bigint(20) NOT NULL COMMENT "", + `lo_custkey` int(11) NOT NULL COMMENT "", + `lo_partkey` int(11) NOT NULL COMMENT "", + `lo_suppkey` int(11) NOT NULL COMMENT "", + `lo_orderdate` int(11) NOT NULL COMMENT "", + `lo_orderpriority` varchar(16) NOT NULL COMMENT "", + `lo_shippriority` int(11) NOT NULL COMMENT "", + `lo_quantity` bigint(20) NOT NULL COMMENT "", + `lo_extendedprice` bigint(20) NOT NULL COMMENT "", + `lo_ordtotalprice` bigint(20) NOT NULL COMMENT "", + `lo_discount` bigint(20) NOT NULL COMMENT "", + `lo_revenue` bigint(20) NOT NULL COMMENT "", + `lo_supplycost` bigint(20) NOT NULL COMMENT "", + `lo_tax` bigint(20) NOT NULL COMMENT "", + `lo_commitdate` bigint(20) NOT NULL COMMENT "", + `lo_shipmode` varchar(11) NOT NULL COMMENT "" +) +DUPLICATE KEY (`lo_orderkey`, `lo_linenumber`) +DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 1 +PROPERTIES ( +"replication_num" = "1" +); diff --git a/regression-test/suites/schema_change/ddl/lineorder_delete.sql b/regression-test/suites/schema_change/ddl/lineorder_delete.sql new file mode 100644 index 00000000000000..2c1c2fa57d99ad --- /dev/null +++ b/regression-test/suites/schema_change/ddl/lineorder_delete.sql @@ -0,0 +1 @@ +drop table IF EXISTS lineorder; diff --git a/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy b/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy new file mode 100644 index 00000000000000..43a9bc2b349a96 --- /dev/null +++ b/regression-test/suites/schema_change/test_double_write_when_schema_change.groovy @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Most of the cases are copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +// Note: To filter out tables from sql files, use the following one-liner comamnd +// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | sort | uniq +suite("double_write_schema_change") { + + // ssb_sf1_p1 is writted to test unique key table merge correctly. + // It creates unique key table and sets bucket num to 1 in order to make sure that + // many rowsets will be created during loading and then the merge process will be triggered. + + def tableName = "lineorder" + def columns = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, + lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, + lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" + + sql new File("""${context.file.parent}/ddl/${tableName}_delete.sql""").text + sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + + streamLoad { + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + table tableName + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + set 'columns', columns + + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/ssb/sf1/${tableName}.tbl.gz""" + + time 10000 // limit inflight 10s + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + + def getJobState = { indexName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def insert_sql = """ insert into ${tableName} values(100000000, 1, 1, 1, 1, 1, "1", 1, 1, 1, 1, 1, 1, 1, 1, 1, "1") """ + + sql """ ALTER TABLE ${tableName} modify COLUMN lo_custkey double""" + int max_try_time = 3000 + while (max_try_time--){ + String result = getJobState(tableName) + if (result == "FINISHED") { + sleep(3000) + break + } else { + if (result == "RUNNING") { + sql insert_sql + } + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } +}