Skip to content

Commit

Permalink
[core] support deletion vector with input ChangelogProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj committed Aug 9, 2024
1 parent 265937e commit 9d13d2c
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/spec/snapshot.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ Snapshot File is JSON, it includes:
12. totalRecordCount: record count of all changes occurred in this snapshot.
13. deltaRecordCount: record count of all new changes occurred in this snapshot.
14. changelogRecordCount: record count of all changelog produced in this snapshot.
15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark.
15. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark.
16. statistics: stats file name for statistics of this table.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final FileIndexOptions fileIndexOptions;

private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;
private final MemorySize maxDiskSize;

public AppendOnlyWriter(
FileIO fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
}

public RecordLevelExpire(int timeField, int expireTime) {
private RecordLevelExpire(int timeField, int expireTime) {
this.timeField = timeField;
this.expireTime = expireTime;
}
Expand All @@ -78,7 +78,7 @@ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactor
return file -> wrap(readerFactory.createRecordReader(file));
}

public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ private static void validateDefaultValues(TableSchema schema) {
private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
|| options.changelogProducer() == ChangelogProducer.INPUT
|| options.changelogProducer() == ChangelogProducer.LOOKUP,
"Deletion vectors mode is only supported for none or lookup changelog producer now.");
"Deletion vectors mode is only supported for NONE/INPUT/LOOKUP changelog producer now.");

checkArgument(
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class DeltaFollowUpScanner implements FollowUpScanner {
@Override
public boolean shouldScanSnapshot(Snapshot snapshot) {
if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
System.out.println("here! : " + snapshot.id());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected ManifestFile createManifestFile(String pathStr) {
path,
getPartitionType(),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString()),
CoreOptions.FILE_FORMAT.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down Expand Up @@ -166,7 +166,7 @@ protected void assertSameContent(

protected List<ManifestFileMeta> createBaseManifestFileMetas(boolean hasPartition) {
List<ManifestFileMeta> input = new ArrayList<>();
// base with 3 partition ,16 entry each parition
// base with 3 partition, 16 entry each partition
for (int j = 0; j < 3; j++) {
List<ManifestEntry> entrys = new ArrayList<>();
for (int i = 0; i < 16; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* A dedicated operator for manual triggered compaction.
*
* <p>In-coming records are generated by sources built from {@link
* org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The records will contain
* org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The records will contain
* partition keys, bucket number, table name and database name.
*/
public class MultiTablesStoreCompactOperator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,53 @@
/** ITCase for deletion vector table. */
public class DeletionVectorITCase extends CatalogITCaseBase {

@ParameterizedTest
@ValueSource(strings = {"input"})
public void testStreamingReadDVTableWhenChangelogProducerIsInput(String changelogProducer)
throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')",
changelogProducer));

sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')");

sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')");

sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')");

// test read from APPEND snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
assertThat(iter.collect(8))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
Row.ofKind(RowKind.INSERT, 2, "2"),
Row.ofKind(RowKind.INSERT, 3, "3"),
Row.ofKind(RowKind.INSERT, 4, "4"),
Row.ofKind(RowKind.INSERT, 2, "2_1"),
Row.ofKind(RowKind.INSERT, 3, "3_1"),
Row.ofKind(RowKind.INSERT, 2, "2_2"),
Row.ofKind(RowKind.INSERT, 4, "4_1"));
}

// test read from COMPACT snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
assertThat(iter.collect(6))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
Row.ofKind(RowKind.INSERT, 2, "2_1"),
Row.ofKind(RowKind.INSERT, 3, "3_1"),
Row.ofKind(RowKind.INSERT, 4, "4"),
Row.ofKind(RowKind.INSERT, 2, "2_2"),
Row.ofKind(RowKind.INSERT, 4, "4_1"));
}
}

@ParameterizedTest
@ValueSource(strings = {"none", "lookup"})
public void testStreamingReadDVTable(String changelogProducer) throws Exception {
Expand Down

0 comments on commit 9d13d2c

Please sign in to comment.