Skip to content

Commit

Permalink
[bugfix](schema_change) Fix the coredump when doubly write during sch…
Browse files Browse the repository at this point in the history
…ema change (apache#22557)
  • Loading branch information
Lchangliang authored Oct 19, 2023
1 parent e77b98b commit 159be51
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 34 deletions.
69 changes: 38 additions & 31 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
#include "common/exception.h"
#include "common/status.h"
#include "olap/tablet_schema.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/large_int_value.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/primitive_type.h"
#include "runtime/raw_value.h"
#include "runtime/types.h"
#include "util/hash_util.hpp"
Expand Down Expand Up @@ -128,30 +130,33 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
for (auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
}
std::map<std::string, SlotDescriptor*> slots_map;
std::unordered_map<std::pair<std::string, std::string>, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));

for (auto& p_slot_desc : pschema.slot_descs()) {
auto slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
_tuple_desc->add_slot(slot_desc);
slots_map.emplace(slot_desc->col_name(), slot_desc);
string data_type;
EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type);
slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), std::move(data_type)),
slot_desc);
}

for (auto& p_index : pschema.indexes()) {
auto index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = p_index.id();
index->schema_hash = p_index.schema_hash();
for (auto& col : p_index.columns()) {
if (_is_partial_update && _partial_update_input_columns.count(col) == 0) {
continue;
}
auto it = slots_map.find(col);
if (it == std::end(slots_map)) {
return Status::InternalError("unknown index column, column={}", col);
}
index->slots.emplace_back(it->second);
}
for (auto& pcolumn_desc : p_index.columns_desc()) {
if (!_is_partial_update ||
_partial_update_input_columns.count(pcolumn_desc.name()) > 0) {
auto it = slots_map.find(
std::make_pair(to_lower(pcolumn_desc.name()), pcolumn_desc.type()));
if (it == std::end(slots_map)) {
return Status::InternalError("unknown index column, column={}, type={}",
pcolumn_desc.name(), pcolumn_desc.type());
}
index->slots.emplace_back(it->second);
}
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
Expand Down Expand Up @@ -183,41 +188,43 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
for (auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
}
std::map<std::string, SlotDescriptor*> slots_map;
std::unordered_map<std::pair<std::string, PrimitiveType>, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
for (auto& t_slot_desc : tschema.slot_descs) {
auto slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
_tuple_desc->add_slot(slot_desc);
slots_map.emplace(to_lower(slot_desc->col_name()), slot_desc);
slots_map.emplace(std::make_pair(to_lower(slot_desc->col_name()), slot_desc->col_type()),
slot_desc);
}

for (auto& t_index : tschema.indexes) {
std::unordered_map<std::string, SlotDescriptor*> index_slots_map;
auto index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
for (auto& col : t_index.columns) {
if (_is_partial_update && _partial_update_input_columns.count(col) == 0) {
continue;
}
auto it = slots_map.find(to_lower(col));
if (it == std::end(slots_map)) {
return Status::InternalError("unknown index column, column={}", col);
}
index->slots.emplace_back(it->second);
}
if (t_index.__isset.columns_desc) {
for (auto& tcolumn_desc : t_index.columns_desc) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
for (auto& tcolumn_desc : t_index.columns_desc) {
auto it = slots_map.find(std::make_pair(to_lower(tcolumn_desc.column_name),
thrift_to_type(tcolumn_desc.column_type.type)));
if (!_is_partial_update ||
_partial_update_input_columns.count(tcolumn_desc.column_name) > 0) {
if (it == slots_map.end()) {
return Status::InternalError("unknown index column, column={}, type={}",
tcolumn_desc.column_name,
tcolumn_desc.column_type.type);
}
index_slots_map.emplace(to_lower(tcolumn_desc.column_name), it->second);
index->slots.emplace_back(it->second);
}
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
}
if (t_index.__isset.indexes_desc) {
for (auto& tindex_desc : t_index.indexes_desc) {
std::vector<int32_t> column_unique_ids(tindex_desc.columns.size());
for (size_t i = 0; i < tindex_desc.columns.size(); i++) {
auto it = slots_map.find(to_lower(tindex_desc.columns[i]));
if (it != std::end(slots_map)) {
auto it = index_slots_map.find(to_lower(tindex_desc.columns[i]));
if (it != index_slots_map.end()) {
column_unique_ids[i] = it->second->col_unique_id();
}
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,9 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_indexes.clear();
_field_name_to_index.clear();
_field_id_to_index.clear();
_delete_sign_idx = -1;
_sequence_col_idx = -1;
_version_col_idx = -1;

for (auto& column : index->columns) {
if (column->is_key()) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class TabletColumn {
int32_t _unique_id = -1;
std::string _col_name;
std::string _col_name_lower_case;
// the field _type will change from TPrimitiveType
// to string by 'EnumToString(TPrimitiveType, tcolumn.column_type.type, data_type);' (reference: TabletMeta::init_column_from_tcolumn)
// to FieldType by 'TabletColumn::get_field_type_by_string' (reference: TabletColumn::init_from_pb).
// And the _type in columnPB is string and it changed from FieldType by 'get_string_by_field_type' (reference: TabletColumn::to_schema_pb).
FieldType _type;
bool _is_key = false;
FieldAggregationMethod _aggregation;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <memory>

#include "common/object_pool.h"
#include "runtime/primitive_type.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/data_types/data_type_factory.hpp"
Expand Down Expand Up @@ -59,6 +60,7 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
_col_name(tdesc.colName),
_col_name_lower_case(to_lower(tdesc.colName)),
_col_unique_id(tdesc.col_unique_id),
_col_type(thrift_to_type(tdesc.primitive_type)),
_slot_idx(tdesc.slotIdx),
_field_idx(-1),
_is_materialized(tdesc.isMaterialized),
Expand All @@ -77,6 +79,7 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_col_name(pdesc.col_name()),
_col_name_lower_case(to_lower(pdesc.col_name())),
_col_unique_id(pdesc.col_unique_id()),
_col_type(static_cast<PrimitiveType>(pdesc.col_type())),
_slot_idx(pdesc.slot_idx()),
_field_idx(-1),
_is_materialized(pdesc.is_materialized()),
Expand All @@ -99,6 +102,7 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_col_unique_id(_col_unique_id);
pslot->set_is_key(_is_key);
pslot->set_is_auto_increment(_is_auto_increment);
pslot->set_col_type(_col_type);
}

vectorized::MutableColumnPtr SlotDescriptor::get_empty_mutable_column() const {
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/global_types.h"
#include "common/status.h"
#include "runtime/define_primitive_type.h"
#include "runtime/types.h"
#include "vec/data_types/data_type.h"

Expand Down Expand Up @@ -113,6 +114,7 @@ class SlotDescriptor {
bool is_auto_increment() const { return _is_auto_increment; }

const std::string& col_default_value() const { return _col_default_value; }
PrimitiveType col_type() const { return _col_type; }

private:
friend class DescriptorTbl;
Expand All @@ -132,6 +134,7 @@ class SlotDescriptor {
const std::string _col_name_lower_case;

const int32_t _col_unique_id;
const PrimitiveType _col_type;

// the idx of the slot in the tuple descriptor (0-based).
// this is provided by the FE
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/primitive_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ PrimitiveType thrift_to_type(TPrimitiveType::type ttype) {

case TPrimitiveType::VARIANT:
return TYPE_VARIANT;

default:
CHECK(false) << ", meet unknown type " << ttype;
return INVALID_TYPE;
Expand Down Expand Up @@ -259,6 +260,8 @@ TPrimitiveType::type to_thrift(PrimitiveType ptype) {
return TPrimitiveType::STRUCT;
case TYPE_LAMBDA_FUNCTION:
return TPrimitiveType::LAMBDA_FUNCTION;
case TYPE_AGG_STATE:
return TPrimitiveType::AGG_STATE;

default:
return TPrimitiveType::INVALID_TYPE;
Expand Down Expand Up @@ -365,6 +368,9 @@ std::string type_to_string(PrimitiveType t) {
case TYPE_LAMBDA_FUNCTION:
return "LAMBDA_FUNCTION TYPE";

case TYPE_VARIANT:
return "VARIANT";

default:
return "";
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,14 @@ public boolean layoutEquals(SlotDescriptor other) {
public TSlotDescriptor toThrift() {
// Non-nullable slots will have 0 for the byte offset and -1 for the bit mask
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), type.toThrift(), -1,
byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getName() : ""), slotIdx,
byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getNonShadowName() : ""), slotIdx,
isMaterialized);
tSlotDescriptor.setNeedMaterialize(needMaterialize);
tSlotDescriptor.setIsAutoIncrement(isAutoInc);
if (column != null) {
LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId());
LOG.debug("column name:{}, column unique id:{}", column.getNonShadowName(), column.getUniqueId());
tSlotDescriptor.setColUniqueId(column.getUniqueId());
tSlotDescriptor.setPrimitiveType(column.getDataType().toThrift());
tSlotDescriptor.setIsKey(column.isKey());
tSlotDescriptor.setColDefaultValue(column.getDefaultValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public int getOlapColumnIndexSize() {

public TColumn toThrift() {
TColumn tColumn = new TColumn();
tColumn.setColumnName(this.name);
tColumn.setColumnName(removeNamePrefix(this.name));

TColumnType tColumnType = new TColumnType();
tColumnType.setType(this.getDataType().toThrift());
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message PSlotDescriptor {
optional int32 col_unique_id = 11;
optional bool is_key = 12;
optional bool is_auto_increment = 13;
optional int32 col_type = 14 [default = 0];
};

message PTupleDescriptor {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ struct TSlotDescriptor {
// subcolumn path info list for semi structure column(variant)
15: optional list<string> column_paths
16: optional string col_default_value
17: optional Types.TPrimitiveType primitive_type = Types.TPrimitiveType.INVALID_TYPE
}

struct TTupleDescriptor {
Expand Down
24 changes: 24 additions & 0 deletions regression-test/suites/schema_change/ddl/lineorder_create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS `lineorder` (
`lo_orderkey` bigint(20) NOT NULL COMMENT "",
`lo_linenumber` bigint(20) NOT NULL COMMENT "",
`lo_custkey` int(11) NOT NULL COMMENT "",
`lo_partkey` int(11) NOT NULL COMMENT "",
`lo_suppkey` int(11) NOT NULL COMMENT "",
`lo_orderdate` int(11) NOT NULL COMMENT "",
`lo_orderpriority` varchar(16) NOT NULL COMMENT "",
`lo_shippriority` int(11) NOT NULL COMMENT "",
`lo_quantity` bigint(20) NOT NULL COMMENT "",
`lo_extendedprice` bigint(20) NOT NULL COMMENT "",
`lo_ordtotalprice` bigint(20) NOT NULL COMMENT "",
`lo_discount` bigint(20) NOT NULL COMMENT "",
`lo_revenue` bigint(20) NOT NULL COMMENT "",
`lo_supplycost` bigint(20) NOT NULL COMMENT "",
`lo_tax` bigint(20) NOT NULL COMMENT "",
`lo_commitdate` bigint(20) NOT NULL COMMENT "",
`lo_shipmode` varchar(11) NOT NULL COMMENT ""
)
DUPLICATE KEY (`lo_orderkey`, `lo_linenumber`)
DISTRIBUTED BY HASH(`lo_orderkey`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table IF EXISTS lineorder;
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Most of the cases are copied from https://github.com/trinodb/trino/tree/master
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases
// and modified by Doris.

// Note: To filter out tables from sql files, use the following one-liner comamnd
// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | sort | uniq
suite("double_write_schema_change") {

// ssb_sf1_p1 is writted to test unique key table merge correctly.
// It creates unique key table and sets bucket num to 1 in order to make sure that
// many rowsets will be created during loading and then the merge process will be triggered.

def tableName = "lineorder"
def columns = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,
lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,
lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy"""

sql new File("""${context.file.parent}/ddl/${tableName}_delete.sql""").text
sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text

streamLoad {
// a default db 'regression_test' is specified in
// ${DORIS_HOME}/conf/regression-conf.groovy
table tableName

// default label is UUID:
// set 'label' UUID.randomUUID().toString()

// default column_separator is specify in doris fe config, usually is '\t'.
// this line change to ','
set 'column_separator', '|'
set 'compress_type', 'GZ'
set 'columns', columns


// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
// also, you can stream load a http stream, e.g. http://xxx/some.csv
file """${getS3Url()}/regression/ssb/sf1/${tableName}.tbl.gz"""

time 10000 // limit inflight 10s

// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}

def getJobState = { indexName ->
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${indexName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}

def insert_sql = """ insert into ${tableName} values(100000000, 1, 1, 1, 1, 1, "1", 1, 1, 1, 1, 1, 1, 1, 1, 1, "1") """

sql """ ALTER TABLE ${tableName} modify COLUMN lo_custkey double"""
int max_try_time = 3000
while (max_try_time--){
String result = getJobState(tableName)
if (result == "FINISHED") {
sleep(3000)
break
} else {
if (result == "RUNNING") {
sql insert_sql
}
sleep(100)
if (max_try_time < 1){
assertEquals(1,2)
}
}
}
}

0 comments on commit 159be51

Please sign in to comment.