Skip to content

Commit

Permalink
[imporvement](table property) support for alter table property disabl…
Browse files Browse the repository at this point in the history
…e_auto_compaction (apache#27961)

in some case, some tablets may cause coredump or OOM when compaction, and it is necessary to manually close the compaction of a specific table by 'disable_auto_compaction' to make be service available

This commit allow modify disable_auto_compaction table property in schema change.

---------

Signed-off-by: nextdreamblue <[email protected]>
  • Loading branch information
nextdreamblue authored Dec 7, 2023
1 parent cb9a6f6 commit 8526b9f
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 8 deletions.
12 changes: 12 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,18 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest&
tablet_meta_info.enable_single_replica_compaction);
need_to_save = true;
}
if (tablet_meta_info.__isset.disable_auto_compaction) {
std::shared_lock rlock(tablet->get_header_lock());
tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction(
tablet_meta_info.disable_auto_compaction);
for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) {
rowset_meta->tablet_schema()->set_disable_auto_compaction(
tablet_meta_info.disable_auto_compaction);
}
tablet->tablet_schema_unlocked()->set_disable_auto_compaction(
tablet_meta_info.disable_auto_compaction);
need_to_save = true;
}

if (tablet_meta_info.__isset.skip_write_index_on_load) {
std::shared_lock rlock(tablet->get_header_lock());
Expand Down
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD));
((SchemaChangeHandler) schemaChangeHandler).updateTableProperties(db, tableName, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2194,6 +2194,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null && timeSeriesCompactionConfig.isEmpty()
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
LOG.info("Properties already up-to-date");
Expand All @@ -2206,6 +2207,12 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
enableSingleCompaction = Boolean.parseBoolean(singleCompaction) ? 1 : 0;
}

String disableAutoCompactionBoolean = properties.get(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION);
int disableAutoCompaction = -1; // < 0 means don't update
if (disableAutoCompactionBoolean != null) {
disableAutoCompaction = Boolean.parseBoolean(disableAutoCompactionBoolean) ? 1 : 0;
}

String skipWriteIndexOnLoad = properties.get(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD);
int skip = -1; // < 0 means don't update
if (skipWriteIndexOnLoad != null) {
Expand All @@ -2214,7 +2221,8 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str

for (Partition partition : partitions) {
updatePartitionProperties(db, olapTable.getName(), partition.getName(), storagePolicyId, isInMemory,
null, compactionPolicy, timeSeriesCompactionConfig, enableSingleCompaction, skip);
null, compactionPolicy, timeSeriesCompactionConfig, enableSingleCompaction, skip,
disableAutoCompaction);
}

olapTable.writeLockOrDdlException();
Expand Down Expand Up @@ -2257,7 +2265,7 @@ public void updatePartitionsProperties(Database db, String tableName, List<Strin
for (String partitionName : partitionNames) {
try {
updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId,
isInMemory, null, null, null, -1, -1);
isInMemory, null, null, null, -1, -1, -1);
} catch (Exception e) {
String errMsg = "Failed to update partition[" + partitionName + "]'s 'in_memory' property. "
+ "The reason is [" + e.getMessage() + "]";
Expand All @@ -2273,7 +2281,8 @@ public void updatePartitionsProperties(Database db, String tableName, List<Strin
public void updatePartitionProperties(Database db, String tableName, String partitionName, long storagePolicyId,
int isInMemory, BinlogConfig binlogConfig, String compactionPolicy,
Map<String, Long> timeSeriesCompactionConfig,
int enableSingleCompaction, int skipWriteIndexOnLoad) throws UserException {
int enableSingleCompaction, int skipWriteIndexOnLoad,
int disableAutoCompaction) throws UserException {
// be id -> <tablet id,schemaHash>
Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = Maps.newHashMap();
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
Expand Down Expand Up @@ -2306,7 +2315,8 @@ public void updatePartitionProperties(Database db, String tableName, String part
countDownLatch.addMark(kv.getKey(), kv.getValue());
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
storagePolicyId, binlogConfig, countDownLatch, compactionPolicy,
timeSeriesCompactionConfig, enableSingleCompaction, skipWriteIndexOnLoad);
timeSeriesCompactionConfig, enableSingleCompaction, skipWriteIndexOnLoad,
disableAutoCompaction);
batchTask.addTask(task);
}
if (!FeConstants.runningUnitTest) {
Expand Down Expand Up @@ -3011,7 +3021,7 @@ public boolean updateBinlogConfig(Database db, OlapTable olapTable, List<AlterCl

for (Partition partition : partitions) {
updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1,
newBinlogConfig, null, null, -1, -1);
newBinlogConfig, null, null, -1, -1, -1);
}

olapTable.writeLockOrDdlException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
}
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)) {
if (!properties.get(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION).equalsIgnoreCase("true")
&& !properties.get(PropertyAnalyzer
.PROPERTIES_DISABLE_AUTO_COMPACTION).equalsIgnoreCase("false")) {
throw new AnalysisException(
"Property "
+ PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION + " should be set to true or false");
}
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) {
long groupCommitIntervalMs;
String groupCommitIntervalMsStr = properties.get(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
Expand Down
1 change: 1 addition & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4698,6 +4698,7 @@ public void modifyTableProperties(Database db, OlapTable table, Map<String, Stri
.buildTimeSeriesCompactionFileCountThreshold()
.buildTimeSeriesCompactionTimeThresholdSeconds()
.buildSkipWriteIndexOnLoad()
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction();

// need to update partition info meta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public TableProperty buildProperty(short opCode) {
buildTimeSeriesCompactionTimeThresholdSeconds();
buildSkipWriteIndexOnLoad();
buildEnableSingleReplicaCompaction();
buildDisableAutoCompaction();
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
// < 0 means not to update property, > 0 means true, == 0 means false
private int enableSingleReplicaCompaction = -1;
private int skipWriteIndexOnLoad = -1;
private int disableAutoCompaction = -1;

public UpdateTabletMetaInfoTask(long backendId, Set<Pair<Long, Integer>> tableIdWithSchemaHash) {
super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO,
Expand Down Expand Up @@ -87,12 +88,14 @@ public UpdateTabletMetaInfoTask(long backendId,
String compactionPolicy,
Map<String, Long> timeSeriesCompactionConfig,
int enableSingleReplicaCompaction,
int skipWriteIndexOnLoad) {
int skipWriteIndexOnLoad,
int disableAutoCompaction) {
this(backendId, tableIdWithSchemaHash, inMemory, storagePolicyId, binlogConfig, latch);
this.compactionPolicy = compactionPolicy;
this.timeSeriesCompactionConfig = timeSeriesCompactionConfig;
this.enableSingleReplicaCompaction = enableSingleReplicaCompaction;
this.skipWriteIndexOnLoad = skipWriteIndexOnLoad;
this.disableAutoCompaction = disableAutoCompaction;
}

public void countDownLatch(long backendId, Set<Pair<Long, Integer>> tablets) {
Expand Down Expand Up @@ -159,6 +162,9 @@ public TUpdateTabletMetaInfoReq toThrift() {
if (skipWriteIndexOnLoad >= 0) {
metaInfo.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad > 0);
}
if (disableAutoCompaction >= 0) {
metaInfo.setDisableAutoCompaction(disableAutoCompaction > 0);
}
updateTabletMetaInfoReq.addToTabletMetaInfos(metaInfo);
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ struct TTabletMetaInfo {
13: optional i64 time_series_compaction_time_threshold_seconds
14: optional bool enable_single_replica_compaction
15: optional bool skip_write_index_on_load
16: optional bool disable_auto_compaction
}

struct TUpdateTabletMetaInfoReq {
Expand Down Expand Up @@ -507,4 +508,4 @@ struct TTopicUpdate {
struct TAgentPublishRequest {
1: required TAgentServiceVersion protocol_version
2: required list<TTopicUpdate> updates
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ suite("test_alter_table_property") {
logger.info("${showResult2}")
assertTrue(showResult2.toString().containsIgnoreCase('"enable_single_replica_compaction" = "true"'))

assertTrue(showResult1.toString().containsIgnoreCase('"disable_auto_compaction" = "false"'))
sql """
alter table ${tableName} set ("disable_auto_compaction" = "true")
"""
sql """sync"""

def showResult3 = sql """show create table ${tableName}"""
logger.info("${showResult3}")
assertTrue(showResult3.toString().containsIgnoreCase('"disable_auto_compaction" = "true"'))

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """sync"""
}
}

0 comments on commit 8526b9f

Please sign in to comment.