From 9d13d2c8ffca4c71b8d7de5cbb0a7c697879f77c Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Fri, 9 Aug 2024 22:20:11 +0800 Subject: [PATCH] [core] support deletion vector with input ChangelogProducer --- docs/content/concepts/spec/snapshot.md | 2 +- .../paimon/append/AppendOnlyWriter.java | 2 +- .../apache/paimon/io/RecordLevelExpire.java | 4 +- .../paimon/schema/SchemaValidation.java | 3 +- .../source/snapshot/DeltaFollowUpScanner.java | 1 + .../manifest/ManifestFileMetaTestBase.java | 4 +- .../sink/MultiTablesStoreCompactOperator.java | 2 +- .../paimon/flink/DeletionVectorITCase.java | 47 +++++++++++++++++++ 8 files changed, 57 insertions(+), 8 deletions(-) diff --git a/docs/content/concepts/spec/snapshot.md b/docs/content/concepts/spec/snapshot.md index d1059827243b..30b5fa0d0bd2 100644 --- a/docs/content/concepts/spec/snapshot.md +++ b/docs/content/concepts/spec/snapshot.md @@ -61,5 +61,5 @@ Snapshot File is JSON, it includes: 12. totalRecordCount: record count of all changes occurred in this snapshot. 13. deltaRecordCount: record count of all new changes occurred in this snapshot. 14. changelogRecordCount: record count of all changelog produced in this snapshot. -15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark. +15. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark. 16. statistics: stats file name for statistics of this table. diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 17ebe215ee1a..6fda88fd8c53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -84,7 +84,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final FileIndexOptions fileIndexOptions; private MemorySegmentPool memorySegmentPool; - private MemorySize maxDiskSize; + private final MemorySize maxDiskSize; public AppendOnlyWriter( FileIO fileIO, diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java index a49e31fc91bd..4a61b66a798a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java @@ -69,7 +69,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) { return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds()); } - public RecordLevelExpire(int timeField, int expireTime) { + private RecordLevelExpire(int timeField, int expireTime) { this.timeField = timeField; this.expireTime = expireTime; } @@ -78,7 +78,7 @@ public FileReaderFactory wrap(FileReaderFactory readerFactor return file -> wrap(readerFactory.createRecordReader(file)); } - public RecordReader wrap(RecordReader reader) { + private RecordReader wrap(RecordReader reader) { int currentTime = (int) (System.currentTimeMillis() / 1000); return reader.filter( kv -> { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 791d38b2cee2..b2c3ed7650f9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -487,8 +487,9 @@ private static void validateDefaultValues(TableSchema schema) { private static void validateForDeletionVectors(CoreOptions options) { checkArgument( options.changelogProducer() == ChangelogProducer.NONE + || options.changelogProducer() == ChangelogProducer.INPUT || options.changelogProducer() == ChangelogProducer.LOOKUP, - "Deletion vectors mode is only supported for none or lookup changelog producer now."); + "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP changelog producer now."); checkArgument( !options.mergeEngine().equals(MergeEngine.FIRST_ROW), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java index 5a8b4ec8c843..c659388d4441 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/DeltaFollowUpScanner.java @@ -33,6 +33,7 @@ public class DeltaFollowUpScanner implements FollowUpScanner { @Override public boolean shouldScanSnapshot(Snapshot snapshot) { if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) { + System.out.println("here! : " + snapshot.id()); return true; } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index f4b3c69ba8b1..fdb3cb7ff462 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -133,7 +133,7 @@ protected ManifestFile createManifestFile(String pathStr) { path, getPartitionType(), "default", - CoreOptions.FILE_FORMAT.defaultValue().toString()), + CoreOptions.FILE_FORMAT.defaultValue()), Long.MAX_VALUE, null) .create(); @@ -166,7 +166,7 @@ protected void assertSameContent( protected List createBaseManifestFileMetas(boolean hasPartition) { List input = new ArrayList<>(); - // base with 3 partition ,16 entry each parition + // base with 3 partition, 16 entry each partition for (int j = 0; j < 3; j++) { List entrys = new ArrayList<>(); for (int i = 0; i < 16; i++) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index f253a3bf8e79..ccbccaa5fa7c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -51,7 +51,7 @@ * A dedicated operator for manual triggered compaction. * *

In-coming records are generated by sources built from {@link - * org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The records will contain + * org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The records will contain * partition keys, bucket number, table name and database name. */ public class MultiTablesStoreCompactOperator diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java index 1fe424b331e5..9f27f9405f03 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -30,6 +30,53 @@ /** ITCase for deletion vector table. */ public class DeletionVectorITCase extends CatalogITCaseBase { + @ParameterizedTest + @ValueSource(strings = {"input"}) + public void testStreamingReadDVTableWhenChangelogProducerIsInput(String changelogProducer) + throws Exception { + sql( + String.format( + "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) " + + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')", + changelogProducer)); + + sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')"); + + sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')"); + + sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')"); + + // test read from APPEND snapshot + try (BlockingIterator iter = + streamSqlBlockIter( + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) { + assertThat(iter.collect(8)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "111111111"), + Row.ofKind(RowKind.INSERT, 2, "2"), + Row.ofKind(RowKind.INSERT, 3, "3"), + Row.ofKind(RowKind.INSERT, 4, "4"), + Row.ofKind(RowKind.INSERT, 2, "2_1"), + Row.ofKind(RowKind.INSERT, 3, "3_1"), + Row.ofKind(RowKind.INSERT, 2, "2_2"), + Row.ofKind(RowKind.INSERT, 4, "4_1")); + } + + // test read from COMPACT snapshot + try (BlockingIterator iter = + streamSqlBlockIter( + "SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) { + assertThat(iter.collect(6)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1, "111111111"), + Row.ofKind(RowKind.INSERT, 2, "2_1"), + Row.ofKind(RowKind.INSERT, 3, "3_1"), + Row.ofKind(RowKind.INSERT, 4, "4"), + Row.ofKind(RowKind.INSERT, 2, "2_2"), + Row.ofKind(RowKind.INSERT, 4, "4_1")); + } + } + @ParameterizedTest @ValueSource(strings = {"none", "lookup"}) public void testStreamingReadDVTable(String changelogProducer) throws Exception {