Skip to content

Commit

Permalink
[Fix](auto-increment) Fix duplicate auto-increment column value probl…
Browse files Browse the repository at this point in the history
…em (#43774)

```cpp
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
    // ...
    return _rpc_status;
}
```
should be
```cpp
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
    // ...
    return ResultError(_rpc_status);
}
```
Otherwise, the returned `Result<int64_t>`'s `m_has_val` will be `true`,
then `AutoIncIDBuffer::_launch_async_fetch_task()` will wrongly add an
auto-increment range [0, length) to `_buffers` which will cause
duplicate value problem.

### Release note

Fix duplicate auto-increment column value problem in some situations.
  • Loading branch information
bobhan1 authored Nov 14, 2024
1 parent bdc8f80 commit b4a7240
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 7 deletions.
33 changes: 26 additions & 7 deletions be/src/vec/sink/autoinc_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/thrift_rpc_helper.h"

Expand All @@ -45,10 +46,18 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t batch_size) {
}

Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
LOG_INFO(
"[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch auto-increment values from fe, "
"db_id={}, table_id={}, column_id={}, length={}",
_db_id, _table_id, _column_id, length);
constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
_rpc_status = Status::OK();
TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
for (uint32_t retry_times = 0; retry_times < FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
_rpc_status = Status::InternalError<false>("injected error");
break;
});
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
request.__set_db_id(_db_id);
Expand All @@ -68,8 +77,9 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {

if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, requested to non-master FE@{}:{}, change "
"to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, column_id={}",
"Failed to fetch auto-increment range, requested to non-master FE@{}:{}, "
"change to request to FE@{}:{}. retry_time={}, db_id={}, table_id={}, "
"column_id={}",
master_addr.hostname, master_addr.port, result.master_address.hostname,
result.master_address.port, retry_times, _db_id, _table_id, _column_id);
master_addr = result.master_address;
Expand All @@ -79,15 +89,15 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {

if (!_rpc_status.ok()) {
LOG_WARNING(
"Failed to fetch auto-incremnt range, encounter rpc failure. "
"Failed to fetch auto-increment range, encounter rpc failure. "
"errmsg={}, retry_time={}, db_id={}, table_id={}, column_id={}",
_rpc_status.to_string(), retry_times, _db_id, _table_id, _column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={}, but get "
"Failed to fetch auto-increment range, request length={}, but get "
"result.length={}, retry_time={}, db_id={}, table_id={}, column_id={}",
length, result.length, retry_times, _db_id, _table_id, _column_id);
LOG(WARNING) << msg;
Expand All @@ -97,14 +107,14 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
}

LOG_INFO(
"get auto-incremnt range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"get auto-increment range from FE@{}:{}, start={}, length={}, elapsed={}ms, "
"retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.start, result.length,
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id, _table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
return _rpc_status;
return ResultError(_rpc_status);
}

void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
Expand Down Expand Up @@ -154,10 +164,19 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
auto&& res = _fetch_ids_from_fe(length);
if (!res.has_value()) [[unlikely]] {
auto&& err = res.error();
LOG_WARNING(
"[AutoIncIDBuffer::_launch_async_fetch_task] failed to fetch auto-increment "
"values from fe, db_id={}, table_id={}, column_id={}, status={}",
_db_id, _table_id, _column_id, err);
_is_fetching = false;
return;
}
int64_t start = res.value();
LOG_INFO(
"[AutoIncIDBuffer::_launch_async_fetch_task] successfully fetch auto-increment "
"values from fe, db_id={}, table_id={}, column_id={}, start={}, length={}",
_db_id, _table_id, _column_id, start, length);
{
std::lock_guard<std::mutex> lock {_latch};
_buffers.emplace_back(start, length);
Expand All @@ -168,4 +187,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
return Status::OK();
}

} // namespace doris::vectorized
} // namespace doris::vectorized
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0

-- !sql --
4

-- !sql --
0

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.junit.Assert
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

suite("test_auto_inc_fetch_fail", "nonConcurrent") {

try {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
def table1 = "test_auto_inc_fetch_fail"
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
`k` int,
`c1` int,
`c2` int,
`c3` int,
`id` BIGINT NOT NULL AUTO_INCREMENT(10000),
) UNIQUE KEY(k)
DISTRIBUTED BY HASH(k) BUCKETS 1
PROPERTIES ("replication_num" = "1"); """

GetDebugPoint().enableDebugPointForAllBEs("AutoIncIDBuffer::_fetch_ids_from_fe.failed")

try {
sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
} catch (Exception e) {
logger.info("error : ${e}")
}
qt_sql "select count(*) from ${table1};"

GetDebugPoint().clearDebugPointsForAllBEs()

Thread.sleep(1000)

sql """insert into ${table1}(k,c1,c2,c3) values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
qt_sql "select count(*) from ${table1};"
qt_sql "select count(*) from ${table1} where id < 10000;"

} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
}
}

0 comments on commit b4a7240

Please sign in to comment.