Skip to content

Commit

Permalink
[fix](cluster key) support segcompaction (#45599)
Browse files Browse the repository at this point in the history
support segcompaction for cluster key
  • Loading branch information
mymeiyi authored Dec 27, 2024
1 parent 04e05ea commit fe0724c
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 23 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ DEFINE_Bool(hide_webserver_config_page, "false");
DEFINE_Bool(enable_segcompaction, "true");

// Max number of segments allowed in a single segcompaction task.
DEFINE_Int32(segcompaction_batch_size, "10");
DEFINE_mInt32(segcompaction_batch_size, "10");

// Max row count allowed in a single source segment, bigger segments will be skipped.
DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ DECLARE_Bool(hide_webserver_config_page);
DECLARE_Bool(enable_segcompaction);

// Max number of segments allowed in a single segcompaction task.
DECLARE_Int32(segcompaction_batch_size);
DECLARE_mInt32(segcompaction_batch_size);

// Max row count allowed in a single source segment, bigger segments will be skipped.
DECLARE_Int32(segcompaction_candidate_max_rows);
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,14 @@ void BaseTablet::generate_tablet_meta_copy_unlocked(TabletMeta& new_tablet_meta)
}

Status BaseTablet::calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
RowsetId rowset_id, const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap) {
size_t const num_segments = segments.size();
if (num_segments < 2) {
return Status::OK();
}

OlapStopWatch watch;
auto const rowset_id = rowset->rowset_id();
size_t seq_col_length = 0;
if (_tablet_meta->tablet_schema()->has_sequence_col()) {
auto seq_col_idx = _tablet_meta->tablet_schema()->sequence_col_idx();
Expand Down Expand Up @@ -1690,7 +1689,8 @@ Status BaseTablet::update_delete_bitmap_without_lock(

// calculate delete bitmap between segments if necessary.
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id());
RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));
RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset->rowset_id(), segments,
delete_bitmap));

// get all base rowsets to calculate on
std::vector<RowsetSharedPtr> specified_rowsets;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class BaseTablet {
DeleteBitmapPtr tablet_delete_bitmap = nullptr);

Status calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
RowsetId rowset_id, const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap);

static Status commit_phase_update_delete_bitmap(
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
// if not doing segcompaction, just check segment number
if (!config::enable_segcompaction || !_context.enable_segcompaction ||
!_context.tablet_schema->cluster_key_uids().empty() ||
_context.tablet_schema->num_variant_columns() > 0) {
return _check_segment_number_limit(_num_segment);
}
Expand Down
41 changes: 31 additions & 10 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
vectorized::RowSourcesBuffer& row_sources_buf, bool is_key,
std::vector<uint32_t>& return_columns,
std::vector<uint32_t>& return_columns, std::vector<uint32_t>& key_group_cluster_key_idxes,
std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
const auto& ctx = _writer->_context;
bool record_rowids = need_convert_delete_bitmap() && is_key;
Expand All @@ -95,6 +95,19 @@ Status SegcompactionWorker::_get_segcompaction_reader(
read_options.use_page_cache = false;
read_options.tablet_schema = ctx.tablet_schema;
read_options.record_rowids = record_rowids;
if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(ctx.rowset_id, *segments,
delete_bitmap));
for (auto& seg_ptr : *segments) {
auto d = delete_bitmap->get_agg(
{ctx.rowset_id, seg_ptr->id(), DeleteBitmap::TEMP_VERSION_COMMON});
if (d->isEmpty()) {
continue; // Empty delete bitmap for the segment
}
read_options.delete_bitmap.emplace(seg_ptr->id(), std::move(d));
}
}
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
std::map<uint32_t, uint32_t> segment_rows;
for (auto& seg_ptr : *segments) {
Expand Down Expand Up @@ -123,6 +136,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.is_key_column_group = is_key;
reader_params.use_page_cache = false;
reader_params.record_rowids = record_rowids;
reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
return (*reader)->init(reader_params, nullptr);
}

Expand Down Expand Up @@ -190,8 +204,9 @@ Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t e

Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat,
Merger::Statistics& merger_stat, uint32_t begin,
uint32_t end) {
uint32_t end, bool is_mow_with_cluster_keys) {
uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */
uint64_t rows_del_by_bitmap = reader_stat.rows_del_by_bitmap;
uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */
uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */
uint64_t output_rows = merger_stat.output_rows; /* rows after merge */
Expand All @@ -205,11 +220,15 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; });
if (raw_rows_read != sum_src_row) {
uint64_t raw_rows = raw_rows_read;
if (is_mow_with_cluster_keys) {
raw_rows += rows_del_by_bitmap;
}
if (raw_rows != sum_src_row) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction read row num does not match source. expect read row:{}, actual read "
"row:{}",
sum_src_row, raw_rows_read);
"row:{}(raw_rows_read: {}, rows_del_by_bitmap: {})",
sum_src_row, raw_rows, raw_rows_read, rows_del_by_bitmap);
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; });
Expand Down Expand Up @@ -281,8 +300,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids);
OlapReaderStatistics reader_stats;
std::unique_ptr<vectorized::VerticalBlockReader> reader;
auto s = _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf,
is_key, column_ids, &reader);
auto s =
_get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf,
is_key, column_ids, key_group_cluster_key_idxes, &reader);
if (UNLIKELY(reader == nullptr || !s.ok())) {
return Status::Error<SEGCOMPACTION_INIT_READER>(
"failed to get segcompaction reader. err: {}", s.to_string());
Expand All @@ -303,9 +323,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
}

/* check row num after merge/aggregation */
RETURN_NOT_OK_STATUS_WITH_WARN(
_check_correctness(key_reader_stats, key_merger_stats, begin, end),
"check correctness failed");
bool is_mow_with_cluster_keys = !tablet->tablet_schema()->cluster_key_uids().empty();
RETURN_NOT_OK_STATUS_WITH_WARN(_check_correctness(key_reader_stats, key_merger_stats, begin,
end, is_mow_with_cluster_keys),
"check correctness failed");
{
std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
_writer->_clear_statistics_for_deleting_segments_unsafe(begin, end);
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ class SegcompactionWorker {
OlapReaderStatistics* stat,
vectorized::RowSourcesBuffer& row_sources_buf, bool is_key,
std::vector<uint32_t>& return_columns,
std::vector<uint32_t>& key_group_cluster_key_idxes,
std::unique_ptr<vectorized::VerticalBlockReader>* reader);
std::unique_ptr<segment_v2::SegmentWriter> _create_segcompaction_writer(uint32_t begin,
uint32_t end);
Status _delete_original_segments(uint32_t begin, uint32_t end);
Status _check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat,
uint32_t begin, uint32_t end);
uint32_t begin, uint32_t end, bool is_mow_with_cluster_keys);
Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments);

private:
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() {
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
if (segments.size() > 1) {
// calculate delete bitmap between segments
RETURN_IF_ERROR(
_tablet->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap));
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_rowset->rowset_id(), segments,
_delete_bitmap));
}

// For partial update, we need to fill in the entire row of data, during the calculation
Expand Down
4 changes: 2 additions & 2 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,8 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
}
if (segments.size() > 1) {
// calculate delete bitmap between segments
status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments,
delete_bitmap);
status = local_tablet->calc_delete_bitmap_between_segments(rowset->rowset_id(),
segments, delete_bitmap);
if (!status) {
LOG(WARNING) << "failed to calculate delete bitmap"
<< ". tablet_id: " << local_tablet->tablet_id()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee

-- !select_default --
47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee
19 changes: 19 additions & 0 deletions regression-test/data/unique_with_mow_c_p0/test_compact_seg.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
12345 23083 30920 40410

-- !select2 --
17320 24209 30795 40000

-- !select3 --
59832 36673 30343 40299

-- !select1 --
12345 23083 30920 40410

-- !select2 --
17320 24209 30795 40782

-- !select3 --
59832 36673 30586 40739

Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,20 @@ suite("test_segcompaction_unique_keys_mow") {

qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """

String[][] tablets = sql """ show tablets from ${tableName}; """
def row_count = sql """ SELECT count(*) FROM ${tableName}; """
logger.info("row_count: " + row_count)
assertEquals(4999989, row_count[0][0])

def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """
logger.info("duplicated keys: " + result)
assertTrue(result.size() == 0, "There are duplicate keys in the table")

def tablets = sql_return_maparray """ show tablets from ${tableName}; """
for (def tablet in tablets) {
def (code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
}
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_segcompaction_unique_keys_mow_ck") {
for (int i = 0; i < 2; i++) {
def tableName = "segcompaction_unique_keys_regression_test_mow_ck_" + i
String ak = getS3AK()
String sk = getS3SK()
String endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = getS3BucketName()


try {

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
`col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
`col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
`col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
`col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
`col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
`col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
`col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
`col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
`col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
)
UNIQUE KEY(`col_0`) cluster by(`col_1`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
PROPERTIES (
""" + (i == 1 ? "\"function_column.sequence_col\"='col_0', " : "") +
"""
"replication_num" = "1"
);
"""
// "enable_unique_key_merge_on_write" = "true"


def uuid = UUID.randomUUID().toString().replace("-", "0")
def path = "oss://$bucket/regression/segcompaction_test/segcompaction_test.orc"

def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49"
String columns_str = ("$columns" != "") ? "($columns)" : "";

sql """
LOAD LABEL $uuid (
DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc")
INTO TABLE $tableName
FORMAT AS "ORC"
$columns_str
)
WITH S3 (
"AWS_ACCESS_KEY" = "$ak",
"AWS_SECRET_KEY" = "$sk",
"AWS_ENDPOINT" = "$endpoint",
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
"""

def max_try_milli_secs = 3600000
while (max_try_milli_secs > 0) {
String[][] result = sql """ show load where label="$uuid" order by createtime desc limit 1; """
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + " $uuid")
break;
}
if (result[0][2].equals("CANCELLED")) {
logger.info("Load CANCELLED " + " $uuid")
break;
}
Thread.sleep(1000)
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
assertTrue(1 == 2, "load Timeout: $uuid")
}
}

qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """

def row_count = sql """ SELECT count(*) FROM ${tableName}; """
logger.info("row_count: " + row_count)
assertEquals(4999989, row_count[0][0])

def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """
logger.info("duplicated keys: " + result)
assertTrue(result.size() == 0, "There are duplicate keys in the table")

def tablets = sql_return_maparray """ show tablets from ${tableName}; """
for (def tablet in tablets) {
def (code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
}
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
}
Loading

0 comments on commit fe0724c

Please sign in to comment.