From 6e044019a190048f15e12e6b639be4e5dff579df Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 22 May 2024 16:47:18 +0800 Subject: [PATCH] [core] Fix that ignore-delete hasn't handle rowkind.field (#3365) --- .../paimon/table/AbstractFileStoreTable.java | 5 +++++ .../table/AppendOnlyFileStoreTable.java | 7 ++++--- .../table/PrimaryKeyFileStoreTable.java | 21 +++++++------------ .../paimon/table/sink/RowKindGenerator.java | 4 ++++ .../paimon/table/sink/TableWriteImpl.java | 16 +++++++++----- .../paimon/flink/sink/LocalMergeOperator.java | 3 +-- .../paimon/flink/BatchFileStoreITCase.java | 16 ++++++++++++++ 7 files changed, 48 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 434264dbbd09..baaac994494c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -40,6 +40,7 @@ import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor; import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor; import org.apache.paimon.table.sink.RowKeyExtractor; +import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor; import org.apache.paimon.table.source.InnerStreamTableScan; @@ -578,6 +579,10 @@ private RollbackHelper rollbackHelper() { store().newTagDeletion()); } + protected RowKindGenerator rowKindGenerator() { + return RowKindGenerator.create(schema(), store().options()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 810669f2e7f1..f45865d018c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -147,13 +147,14 @@ public TableWriteImpl newWrite( return new TableWriteImpl<>( writer, createRowKeyExtractor(), - record -> { + (record, rowKind) -> { Preconditions.checkState( - record.row().getRowKind() == RowKind.INSERT, + rowKind.isAdd(), "Append only writer can not accept row with RowKind %s", - record.row().getRowKind()); + rowKind); return record.row(); }, + rowKindGenerator(), CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index bf26ec31cf2f..8c6925d73a51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueFileStore; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; @@ -35,13 +34,11 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; -import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; -import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import java.util.List; @@ -167,21 +164,17 @@ public TableWriteImpl newWrite(String commitUser) { @Override public TableWriteImpl newWrite( String commitUser, ManifestCacheFilter manifestFilter) { - TableSchema schema = schema(); - CoreOptions options = store().options(); - RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options); KeyValue kv = new KeyValue(); return new TableWriteImpl<>( store().newWrite(commitUser, manifestFilter), createRowKeyExtractor(), - record -> { - InternalRow row = record.row(); - RowKind rowKind = - rowKindGenerator == null - ? row.getRowKind() - : rowKindGenerator.generate(row); - return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row); - }, + (record, rowKind) -> + kv.replace( + record.primaryKey(), + KeyValue.UNKNOWN_SEQUENCE, + rowKind, + record.row()), + rowKindGenerator(), CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java index de55afceb7e3..1834fdffae61 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java @@ -62,4 +62,8 @@ public static RowKindGenerator create(TableSchema schema, CoreOptions options) { .map(field -> new RowKindGenerator(field, schema.logicalRowType())) .orElse(null); } + + public static RowKind getRowKind(@Nullable RowKindGenerator rowKindGenerator, InternalRow row) { + return rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 9f63a410f020..cd1af85d3be8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -30,6 +30,7 @@ import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.FileStoreWrite.State; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.Restorable; import javax.annotation.Nullable; @@ -49,6 +50,7 @@ public class TableWriteImpl implements InnerTableWrite, Restorable write; private final KeyAndBucketExtractor keyAndBucketExtractor; private final RecordExtractor recordExtractor; + @Nullable private final RowKindGenerator rowKindGenerator; private final boolean ignoreDelete; private boolean batchCommitted = false; @@ -58,10 +60,12 @@ public TableWriteImpl( FileStoreWrite write, KeyAndBucketExtractor keyAndBucketExtractor, RecordExtractor recordExtractor, + @Nullable RowKindGenerator rowKindGenerator, boolean ignoreDelete) { this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; + this.rowKindGenerator = rowKindGenerator; this.ignoreDelete = ignoreDelete; } @@ -128,21 +132,23 @@ public void write(InternalRow row, int bucket) throws Exception { @Nullable public SinkRecord writeAndReturn(InternalRow row) throws Exception { - if (ignoreDelete && row.getRowKind().isRetract()) { + RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); + if (ignoreDelete && rowKind.isRetract()) { return null; } SinkRecord record = toSinkRecord(row); - write.write(record.partition(), record.bucket(), recordExtractor.extract(record)); + write.write(record.partition(), record.bucket(), recordExtractor.extract(record, rowKind)); return record; } @Nullable public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { - if (ignoreDelete && row.getRowKind().isRetract()) { + RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); + if (ignoreDelete && rowKind.isRetract()) { return null; } SinkRecord record = toSinkRecord(row, bucket); - write.write(record.partition(), bucket, recordExtractor.extract(record)); + write.write(record.partition(), bucket, recordExtractor.extract(record, rowKind)); return record; } @@ -231,6 +237,6 @@ public FileStoreWrite getWrite() { /** Extractor to extract {@link T} from the {@link SinkRecord}. */ public interface RecordExtractor { - T extract(SinkRecord record); + T extract(SinkRecord record, RowKind rowKind); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index a09372443c8d..273f383088d9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -137,8 +137,7 @@ public void processElement(StreamRecord record) throws Exception { recordCount++; InternalRow row = record.getValue(); - RowKind rowKind = - rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row); + RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 2758b73529bb..55bd886ad072 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -422,6 +422,22 @@ public void testIgnoreDelete() { assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B")); } + @Test + public void testIgnoreDeleteWithRowKindField() { + sql( + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) " + + "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')"); + + sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I")); + + sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I")); + + sql("INSERT INTO ignore_delete VALUES (1, 'B', '+I')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B", "+I")); + } + @Test public void testDeleteWithPkLookup() throws Exception { sql(