diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index c92a58aa9f5d..ad792016c35a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -110,6 +110,7 @@ private TableConfigUtils() { // supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}. private static final String UPSERT_COMPACTION_TASK_TYPE = "UpsertCompactionTask"; + private static final String UPSERT_COMPACT_MERGE_TASK_TYPE = "UpsertCompactMergeTask"; // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. @@ -752,11 +753,13 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) Preconditions.checkState(upsertConfig.isEnableSnapshot(), "enableDeletedKeysCompactionConsistency should exist with enableSnapshot for upsert table"); - // enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask + // enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask / UpsertCompactMergeTask TableTaskConfig taskConfig = tableConfig.getTaskConfig(); - Preconditions.checkState( - taskConfig != null && taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACTION_TASK_TYPE), - "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask for upsert table"); + Preconditions.checkState(taskConfig != null + && (taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACTION_TASK_TYPE) + || taskConfig.getTaskTypeConfigsMap().containsKey(UPSERT_COMPACT_MERGE_TASK_TYPE)), + "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask" + + " / UpsertCompactMergeTask for upsert table"); } if (upsertConfig.getConsistencyMode() != UpsertConfig.ConsistencyMode.NONE) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 72a17ee7d1c6..88691dd8c15f 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -2067,7 +2067,7 @@ public void testValidateUpsertConfig() { "enableDeletedKeysCompactionConsistency should exist with enableSnapshot for upsert table"); } - // test enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask + // test enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask / UpsertCompactMerge task upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); upsertConfig.setEnableDeletedKeysCompactionConsistency(true); upsertConfig.setDeletedKeysTTL(100); @@ -2080,7 +2080,8 @@ public void testValidateUpsertConfig() { TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema); } catch (IllegalStateException e) { Assert.assertEquals(e.getMessage(), - "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask for upsert table"); + "enableDeletedKeysCompactionConsistency should exist with UpsertCompactionTask " + + "/ UpsertCompactMergeTask for upsert table"); } }