Skip to content

Commit

Permalink
[core] Add validation to forbid setting stream-read-overwrite with 'f…
Browse files Browse the repository at this point in the history
…ull-compaction' or 'lookup' changelog producer (#2180)
  • Loading branch information
yuzelin authored Oct 30, 2023
1 parent 5098f11 commit 67dc98e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@
<td><h5>streaming-read-overwrite</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to read the changes from overwrite in streaming mode.</td>
<td>Whether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes.</td>
</tr>
<tr>
<td><h5>tag.automatic-creation</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ public class CoreOptions implements Serializable {
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to read the changes from overwrite in streaming mode.");
"Whether to read the changes from overwrite in streaming mode. Cannot be set to true when "
+ "changelog producer is full-compaction or lookup because it will read duplicated changes.");

public static final ConfigOption<Boolean> DYNAMIC_PARTITION_OVERWRITE =
key("dynamic-partition-overwrite")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -87,6 +88,16 @@ public static void validateTableSchema(TableSchema schema) {
"Can not set %s on table without primary keys, please define primary keys.",
CHANGELOG_PRODUCER.key()));
}
if (options.streamingReadOverwrite()
&& (changelogProducer == ChangelogProducer.FULL_COMPACTION
|| changelogProducer == ChangelogProducer.LOOKUP)) {
throw new UnsupportedOperationException(
String.format(
"Cannot set %s to true when changelog producer is %s or %s because it will read duplicated changes.",
STREAMING_READ_OVERWRITE.key(),
ChangelogProducer.FULL_COMPACTION,
ChangelogProducer.LOOKUP));
}

checkArgument(
options.snapshotNumRetainMin() > 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.IntType;
import org.apache.paimon.utils.BlockingIterator;

Expand Down Expand Up @@ -682,4 +683,25 @@ private void assertPartitionsTable(String tableName) throws Exception {
assertThat((String) row.getField(0)).containsAnyOf("[1]", "[2]", "[3]", "[4]");
}
}

@Test
public void testInvalidStreamingReadOverwrite() {
String ddl =
"CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b STRING)"
+ "WITH ('changelog-producer' = '%s', 'streaming-read-overwrite' = 'true')";

assertThatThrownBy(() -> sql(ddl, "full-compaction"))
.satisfies(
AssertionUtils.anyCauseMatches(
UnsupportedOperationException.class,
"Cannot set streaming-read-overwrite to true when changelog producer "
+ "is full-compaction or lookup because it will read duplicated changes."));

assertThatThrownBy(() -> sql(ddl, "lookup"))
.satisfies(
AssertionUtils.anyCauseMatches(
UnsupportedOperationException.class,
"Cannot set streaming-read-overwrite to true when changelog producer "
+ "is full-compaction or lookup because it will read duplicated changes."));
}
}

0 comments on commit 67dc98e

Please sign in to comment.