Skip to content

Commit

Permalink
[fix](bloom filter)Fix drop column with bloom filter index (apache#44361
Browse files Browse the repository at this point in the history
)

Problem Summary:
1. When drop column with bloom filter, we modify the bloom filter column
info
2. When replay editLog, we rebuild bloom filter info by table schema.

Related PR: apache#41369

Fix drop column with bloom filter index
  • Loading branch information
qidaye committed Nov 22, 2024
1 parent 2122f13 commit bf4f3a1
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,12 @@ private boolean processDropColumn(DropColumnClause alterClause, OlapTable olapTa
// drop bloom filter column
Set<String> bfCols = olapTable.getCopiedBfColumns();
if (bfCols != null) {
Set<String> newBfCols = new HashSet<>();
Set<String> newBfCols = null;
for (String bfCol : bfCols) {
if (!bfCol.equalsIgnoreCase(dropColName)) {
if (newBfCols == null) {
newBfCols = Sets.newHashSet();
}
newBfCols.add(bfCol);
}
}
Expand Down Expand Up @@ -2912,6 +2915,25 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
LOG.info("finished modify table's add or drop or modify columns. table: {}, job: {}, is replay: {}",
olapTable.getName(), jobId, isReplay);
}
// for bloom filter, rebuild bloom filter info by table schema in replay
if (isReplay) {
Set<String> bfCols = olapTable.getCopiedBfColumns();
if (bfCols != null) {
List<Column> columns = olapTable.getBaseSchema();
Set<String> newBfCols = null;
for (String bfCol : bfCols) {
for (Column column : columns) {
if (column.getName().equalsIgnoreCase(bfCol)) {
if (newBfCols == null) {
newBfCols = Sets.newHashSet();
}
newBfCols.add(column.getName());
}
}
}
olapTable.setBloomFilterInfo(newBfCols, olapTable.getBfFpp());
}
}
}

public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
1 1
1 1 1

-- !select --
1 \N
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,90 @@ suite("test_bloom_filter_drop_column") {

sql """CREATE TABLE IF NOT EXISTS ${table_name} (
`a` varchar(150) NULL,
`c1` varchar(10)
`c1` varchar(10),
`c2` varchar(10)
) ENGINE=OLAP
DUPLICATE KEY(`a`)
DISTRIBUTED BY HASH(`a`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"bloom_filter_columns" = "c1",
"bloom_filter_columns" = "c1, c2",
"in_memory" = "false",
"storage_format" = "V2"
)"""
def timeout = 60000
def delta_time = 1000
def alter_res = "null"
def useTime = 0

sql """INSERT INTO ${table_name} values ('1', '1')"""
def wait_for_latest_op_on_table_finish = { tableName, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${tableName}" ORDER BY CreateTime DESC LIMIT 1;"""
alter_res = alter_res.toString()
if(alter_res.contains("FINISHED")) {
sleep(3000) // wait change table state to normal
logger.info(table_name + " latest alter job finished, detail: " + alter_res)
break
}
useTime = t
sleep(delta_time)
}
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}

def assertShowCreateTableWithRetry = { tableName, expectedCondition, contains, maxRetries, waitSeconds ->
int attempt = 0
while (attempt < maxRetries) {
def res = sql """SHOW CREATE TABLE ${tableName}"""
log.info("Attempt ${attempt + 1}: show table: ${res}")
if (res && res.size() > 0 && ((contains && res[0][1].contains(expectedCondition)) || (!contains && !res[0][1].contains(expectedCondition)))) {
logger.info("Attempt ${attempt + 1}: Condition met.")
return
} else {
logger.warn("Attempt ${attempt + 1}: Condition not met. Retrying after ${waitSeconds} second(s)...")
}
attempt++
if (attempt < maxRetries) {
sleep(waitSeconds * 1000)
}
}
def finalRes = sql """SHOW CREATE TABLE ${tableName}"""
log.info("Final attempt: show table: ${finalRes}")
assertTrue(finalRes && finalRes.size() > 0, "SHOW CREATE TABLE return empty or null")
if (contains) {
assertTrue(finalRes[0][1].contains(expectedCondition), "expected to contain \"${expectedCondition}\", actual: ${finalRes[0][1]}")
} else {
assertTrue(!finalRes[0][1].contains(expectedCondition), "expected not to contain \"${expectedCondition}\", actual: ${finalRes[0][1]}")
}
}

sql """INSERT INTO ${table_name} values ('1', '1', '1')"""
sql "sync"

qt_select """select * from ${table_name} order by a"""

assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"c1, c2\"", true, 3, 30)
// drop column c1
sql """ALTER TABLE ${table_name} DROP COLUMN c1"""
wait_for_latest_op_on_table_finish(table_name, timeout)
sql "sync"

// show create table with retry logic
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"c2\"", true, 3, 30)

// drop column c2
sql """ALTER TABLE ${table_name} DROP COLUMN c2"""
wait_for_latest_op_on_table_finish(table_name, timeout)
sql "sync"
// show create table
def res = sql """SHOW CREATE TABLE ${table_name}"""
assert res[0][1].contains("\"bloom_filter_columns\" = \"\"")

// show create table with retry logic
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"\"", false, 3, 30)

// add new column c1
sql """ALTER TABLE ${table_name} ADD COLUMN c1 ARRAY<STRING>"""
wait_for_latest_op_on_table_finish(table_name, timeout)
sql "sync"

// insert data
sql """INSERT INTO ${table_name} values ('2', null)"""
sql "sync"
Expand Down

0 comments on commit bf4f3a1

Please sign in to comment.