From d95ac3205c11e1111648460e76b0fbf0c3d04b0f Mon Sep 17 00:00:00 2001 From: meiyi Date: Thu, 18 Apr 2024 20:37:55 +0800 Subject: [PATCH 1/5] [fix](txn insert) Fix txn insert commit failed when schema change (#33706) --- .../doris/planner/StreamLoadPlanner.java | 4 + .../doris/qe/InsertStreamTxnExecutor.java | 153 +++++++++--------- .../txn_insert_values_with_schema_change.out | 13 ++ ...xn_insert_values_with_schema_change.groovy | 109 +++++++++++++ 4 files changed, 202 insertions(+), 77 deletions(-) create mode 100644 regression-test/data/insert_p0/txn_insert_values_with_schema_change.out create mode 100644 regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index ac93d18243ee6f..53576aa7e9c604 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -112,10 +112,12 @@ public OlapTable getDestTable() { return destTable; } + // the caller should get table read lock when call this method public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { return this.plan(loadId, 1); } + // the caller should get table read lock when call this method // create the plan. the plan's query id and load id are same, using the parameter 'loadId' public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException { if (destTable.getKeysType() != KeysType.UNIQUE_KEYS @@ -344,11 +346,13 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde return params; } + // the caller should get table read lock when call this method // single table plan fragmentInstanceIndex is 1(default value) public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws UserException { return this.planForPipeline(loadId, 1); } + // the caller should get table read lock when call this method public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException { if (destTable.getKeysType() != KeysType.UNIQUE_KEYS && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index f37457cf58d457..e89553e672dab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -43,6 +43,7 @@ import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionEntry; +import org.apache.doris.transaction.TransactionState; import org.apache.thrift.TException; @@ -65,92 +66,90 @@ public InsertStreamTxnExecutor(TransactionEntry txnEntry) { public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException { TTxnParams txnConf = txnEntry.getTxnConf(); + OlapTable table = (OlapTable) txnEntry.getTable(); // StreamLoadTask's id == request's load_id StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); - StreamLoadPlanner planner = new StreamLoadPlanner( - (Database) txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); - // Will using load id as query id in fragment - if (Config.enable_pipeline_load) { - TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId()); - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); - List beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - if (beIds.isEmpty()) { - throw new UserException("No available backend to match the policy: " + policy); - } - - tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); - for (Map.Entry> entry : tRequest.local_params.get(0) - .per_node_scan_ranges.entrySet()) { - for (TScanRangeParams scanRangeParams : entry.getValue()) { - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( - TFileFormatType.FORMAT_PROTO); - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( - TFileCompressType.PLAIN); + StreamLoadPlanner planner = new StreamLoadPlanner((Database) txnEntry.getDb(), table, streamLoadTask); + boolean enablePipelineLoad = Config.enable_pipeline_load; + TPipelineFragmentParamsList pipelineParamsList = new TPipelineFragmentParamsList(); + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + if (!table.tryReadLock(1, TimeUnit.MINUTES)) { + throw new UserException("get table read lock timeout, database=" + table.getDatabase().getId() + ",table=" + + table.getName()); + } + try { + // Will using load id as query id in fragment + if (enablePipelineLoad) { + TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId()); + tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); + for (Map.Entry> entry : tRequest.local_params.get(0) + .per_node_scan_ranges.entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } } - } - txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id); - this.loadId = request.getLoadId(); - this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() - .setHi(loadId.getHi()) - .setLo(loadId.getLo()).build()); - - Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); - txnConf.setUserIp(backend.getHost()); - txnEntry.setBackend(backend); - TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - try { - TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList(); - paramsList.addToParamsList(tRequest); - Future future = - BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); - InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); + txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id); + this.loadId = request.getLoadId(); + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); + + pipelineParamsList.addToParamsList(tRequest); + } else { + TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); + tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); + for (Map.Entry> entry : tRequest.params.per_node_scan_ranges + .entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } } - } catch (RpcException e) { - throw new TException(e); + txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); + this.loadId = request.getLoadId(); + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); + paramsList.addToParamsList(tRequest); } - } else { - TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); - List beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - if (beIds.isEmpty()) { - throw new UserException("No available backend to match the policy: " + policy); + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(table.getDatabase().getId(), streamLoadTask.getTxnId()); + if (transactionState != null) { + transactionState.addTableIndexes(table); } + } finally { + table.readUnlock(); + } - tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); - for (Map.Entry> entry : tRequest.params.per_node_scan_ranges.entrySet()) { - for (TScanRangeParams scanRangeParams : entry.getValue()) { - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( - TFileFormatType.FORMAT_PROTO); - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( - TFileCompressType.PLAIN); - } + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); + List beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (beIds.isEmpty()) { + throw new UserException("No available backend to match the policy: " + policy); + } + + Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); + txnConf.setUserIp(backend.getHost()); + txnEntry.setBackend(backend); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + try { + Future future; + if (enablePipelineLoad) { + future = BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, pipelineParamsList, false); + } else { + future = BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); } - txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); - this.loadId = request.getLoadId(); - this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() - .setHi(loadId.getHi()) - .setLo(loadId.getLo()).build()); - - Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); - txnConf.setUserIp(backend.getHost()); - txnEntry.setBackend(backend); - TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - try { - TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); - paramsList.addToParamsList(tRequest); - Future future = - BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); - InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); - } - } catch (RpcException e) { - throw new TException(e); + InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); } + } catch (RpcException e) { + throw new TException(e); } } diff --git a/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out b/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out new file mode 100644 index 00000000000000..9e1016fe0e26e7 --- /dev/null +++ b/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +0 a 10 +1 b 20 +2 c 30 +3 d 40 + +-- !select2 -- +0 a 10 +1 b 20 +2 c 30 +3 d 40 + diff --git a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy b/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy new file mode 100644 index 00000000000000..cd428b185c4179 --- /dev/null +++ b/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_values_with_schema_change") { + def table = "txn_insert_values_with_schema_change" + + def dbName = "regression_test_insert_p0" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch schemaChangeLatch = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + `ID` int(11) NOT NULL, + `NAME` varchar(100) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + duplicate KEY(`id`) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + sql """ insert into ${table} values(0, 'a', 10) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + Assert.fail("alter table job state is not ${job_state} after retry 10 times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + statement.execute("SET enable_nereids_planner = true") + statement.execute("SET enable_fallback_to_original_planner = false") + statement.execute("begin") + statement.execute("insert into ${table} values(1, 'b', 20), (2, 'c', 30);") + + schemaChangeLatch.countDown() + insertLatch.await(2, TimeUnit.MINUTES) + + statement.execute("insert into ${table} values(3, 'd', 40);") + statement.execute("commit") + } catch (Throwable e) { + logger.error("txn insert failed", e) + errors.add("txn insert failed " + e.getMessage()) + } + } + + def schemaChange = { sql, job_state -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + schemaChangeLatch.await(2, TimeUnit.MINUTES) + statement.execute(sql) + getAlterTableState(job_state) + insertLatch.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + Thread insert_thread = new Thread(() -> txnInsert()) + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} order by (id, score, name);", "WAITING_TXN")) + insert_thread.start() + schema_change_thread.start() + insert_thread.join() + schema_change_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + order_qt_select1 """select id, name, score from ${table} """ + getAlterTableState("FINISHED") + order_qt_select2 """select id, name, score from ${table} """ +} From 9d9c080bd0983188b5436f55b2e73d43aadfe1f2 Mon Sep 17 00:00:00 2001 From: zhannngchen <48427519+zhannngchen@users.noreply.github.com> Date: Thu, 18 Apr 2024 20:40:06 +0800 Subject: [PATCH 2/5] [enhancement](partial-update) print more log while missed some rowsets (#33711) --- be/src/olap/rowset/segment_v2/segment_writer.cpp | 15 ++++++++------- .../rowset/segment_v2/vertical_segment_writer.cpp | 15 ++++++++------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 0d841d0ec095a2..f6be1917e57840 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -453,19 +453,20 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* !_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update; specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale); - if (_opts.rowset_ctx->partial_update_info->is_strict_mode && - specified_rowsets.size() != _mow_context->rowset_ids.size()) { + if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { // Only when this is a strict mode partial update that missing rowsets here will lead to problems. // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) LOG(WARNING) << fmt::format( "[Memtable Flush] some rowsets have been deleted due to " - "compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in strict " - "mode partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}", + "compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in " + "partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}", specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(), _mow_context->max_version, _mow_context->txn_id); - return Status::InternalError( - "[Memtable Flush] some rowsets have been deleted due to " - "compaction in strict mode partial update"); + if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { + return Status::InternalError( + "[Memtable Flush] some rowsets have been deleted due to " + "compaction in strict mode partial update"); + } } } std::vector> segment_caches(specified_rowsets.size()); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index db10e208ff4426..8f7bb9fe245f0e 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -386,19 +386,20 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da !_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update; specified_rowsets = tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale); - if (_opts.rowset_ctx->partial_update_info->is_strict_mode && - specified_rowsets.size() != _mow_context->rowset_ids.size()) { + if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { // Only when this is a strict mode partial update that missing rowsets here will lead to problems. // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) LOG(WARNING) << fmt::format( "[Memtable Flush] some rowsets have been deleted due to " - "compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in strict " - "mode partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}", + "compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in " + "partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}", specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(), _mow_context->max_version, _mow_context->txn_id); - return Status::InternalError( - "[Memtable Flush] some rowsets have been deleted due to " - "compaction in strict mode partial update"); + if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { + return Status::InternalError( + "[Memtable Flush] some rowsets have been deleted due to " + "compaction in strict mode partial update"); + } } } std::vector> segment_caches(specified_rowsets.size()); From d7b0616dbd14bf1e4f9b526bb9ca4cc0c4ceeeae Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Thu, 18 Apr 2024 20:42:07 +0800 Subject: [PATCH 3/5] [fix](compile) fix two compile errors on MacOS (#33834) 1. MacOS use libhdfs3, so we need call different function. this compile error intro by PR #33680 2. size_t is not UInt64 on MacOS this compile error intro by PR #33265 --- be/src/io/fs/hdfs_file_writer.cpp | 4 ++++ .../aggregate_function_group_array_intersect.h | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 7efb4bfb073d6a..46f5a626e42573 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -60,7 +60,11 @@ Status HdfsFileWriter::close() { _closed = true; if (_sync_file_data) { +#ifdef USE_LIBHDFS3 + int ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file); +#else int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); +#endif if (ret != 0) { return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", _fs_name, _path.native(), hdfs_error()); diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h index 03c1639c45aa09..5d627782f25e4d 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h +++ b/be/src/vec/aggregate_functions/aggregate_function_group_array_intersect.h @@ -256,11 +256,11 @@ class AggregateFunctionGroupArrayIntersect read_pod_binary(is_set_contains_null, buf); data.value->change_contains_null_value(is_set_contains_null); read_pod_binary(data.init, buf); - size_t size; + UInt64 size; read_var_uint(size, buf); T element; - for (size_t i = 0; i < size; ++i) { + for (UInt64 i = 0; i < size; ++i) { read_int_binary(element, buf); data.value->insert(static_cast(&element)); } @@ -484,11 +484,11 @@ class AggregateFunctionGroupArrayIntersectGeneric read_pod_binary(is_set_contains_null, buf); data.value->change_contains_null_value(is_set_contains_null); read_pod_binary(data.init, buf); - size_t size; + UInt64 size; read_var_uint(size, buf); StringRef element; - for (size_t i = 0; i < size; ++i) { + for (UInt64 i = 0; i < size; ++i) { element = read_string_binary_into(*arena, buf); data.value->insert((void*)element.data, element.size); } From 7876ae7dabccb3c70ab71ca4f1c4e5cb8017c330 Mon Sep 17 00:00:00 2001 From: AlexYue Date: Thu, 18 Apr 2024 22:30:34 +0800 Subject: [PATCH 4/5] [fix](Regression) Don't override the regression's enableCacheData and enableStorageVault conf (#33830) --- .../src/main/groovy/org/apache/doris/regression/Config.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 08e1d353bb7b48..2b7cd480019626 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -271,8 +271,8 @@ class Config { config.dataPath = FileUtils.getCanonicalPath(cmd.getOptionValue(dataOpt, config.dataPath)) config.realDataPath = FileUtils.getCanonicalPath(cmd.getOptionValue(realDataOpt, config.realDataPath)) config.cacheDataPath = cmd.getOptionValue(cacheDataOpt, config.cacheDataPath) - config.enableCacheData = Boolean.parseBoolean(cmd.getOptionValue(enableCacheDataOpt, "true")) - config.enableStorageVault = Boolean.parseBoolean(cmd.getOptionValue(enableStorageVaultOpt, "true")) + config.enableCacheData = Boolean.parseBoolean(cmd.getOptionValue(enableCacheDataOpt, config.enableCacheData.toString())) + config.enableStorageVault = Boolean.parseBoolean(cmd.getOptionValue(enableStorageVaultOpt, config.enableStorageVault.toString())) config.pluginPath = FileUtils.getCanonicalPath(cmd.getOptionValue(pluginOpt, config.pluginPath)) config.sslCertificatePath = FileUtils.getCanonicalPath(cmd.getOptionValue(sslCertificateOpt, config.sslCertificatePath)) config.dorisComposePath = FileUtils.getCanonicalPath(config.dorisComposePath) From 9606252072d62c6cbc3c91224aad463de5c6c692 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 18 Apr 2024 22:49:59 +0800 Subject: [PATCH 5/5] [fix](memory) temporarily close Allocator address sanitizers #33862 --- be/src/runtime/memory/mem_tracker_limiter.cpp | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 4ddc48640563b2..647c7629301edc 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -123,7 +123,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { #ifdef NDEBUG LOG(INFO) << err_msg; #else - LOG(FATAL) << err_msg << print_address_sanitizers(); + LOG(INFO) << err_msg << print_address_sanitizers(); #endif } if (ExecEnv::tracking_memory()) { @@ -132,10 +132,10 @@ MemTrackerLimiter::~MemTrackerLimiter() { _consumption->set(0); #ifndef NDEBUG } else if (!_address_sanitizers.empty()) { - LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " - << ", mem tracker label: " << _label - << ", peak consumption: " << _consumption->peak_value() - << print_address_sanitizers(); + LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. " + << ", mem tracker label: " << _label + << ", peak consumption: " << _consumption->peak_value() + << print_address_sanitizers(); #endif } g_memtrackerlimiter_cnt << -1; @@ -147,13 +147,13 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) { std::lock_guard l(_address_sanitizers_mtx); auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { - LOG(FATAL) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace() - << ", old stack_trace: " << it->second.stack_trace; + LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", old buf: " << it->first + << ", old size: " << it->second.size + << ", new stack_trace: " << get_stack_trace() + << ", old stack_trace: " << it->second.stack_trace; } // if alignment not equal to 0, maybe usable_size > size. @@ -170,21 +170,21 @@ void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) { auto it = _address_sanitizers.find(buf); if (it != _address_sanitizers.end()) { if (it->second.size != size) { - LOG(FATAL) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " - "label: " - << _label << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() - << ", buf: " << buf << ", size: " << size << ", old buf: " << it->first - << ", old size: " << it->second.size - << ", new stack_trace: " << get_stack_trace() - << ", old stack_trace: " << it->second.stack_trace; + LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker " + "label: " + << _label << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() + << ", buf: " << buf << ", size: " << size << ", old buf: " << it->first + << ", old size: " << it->second.size + << ", new stack_trace: " << get_stack_trace() + << ", old stack_trace: " << it->second.stack_trace; } _address_sanitizers.erase(buf); } else { - LOG(FATAL) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label - << ", consumption: " << _consumption->current_value() - << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf - << ", size: " << size << ", stack_trace: " << get_stack_trace(); + LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label + << ", consumption: " << _consumption->current_value() + << ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf + << ", size: " << size << ", stack_trace: " << get_stack_trace(); } } }