Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] support deletion vector with input ChangelogProducer #3929

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/concepts/spec/snapshot.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ Snapshot File is JSON, it includes:
12. totalRecordCount: record count of all changes occurred in this snapshot.
13. deltaRecordCount: record count of all new changes occurred in this snapshot.
14. changelogRecordCount: record count of all changelog produced in this snapshot.
15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark.
15. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark.
16. statistics: stats file name for statistics of this table.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final FileIndexOptions fileIndexOptions;

private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;
private final MemorySize maxDiskSize;

public AppendOnlyWriter(
FileIO fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
}

public RecordLevelExpire(int timeField, int expireTime) {
private RecordLevelExpire(int timeField, int expireTime) {
this.timeField = timeField;
this.expireTime = expireTime;
}
Expand All @@ -78,7 +78,7 @@ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactor
return file -> wrap(readerFactory.createRecordReader(file));
}

public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ private static void validateDefaultValues(TableSchema schema) {
private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
|| options.changelogProducer() == ChangelogProducer.INPUT
|| options.changelogProducer() == ChangelogProducer.LOOKUP,
"Deletion vectors mode is only supported for none or lookup changelog producer now.");
"Deletion vectors mode is only supported for NONE/INPUT/LOOKUP changelog producer now.");

checkArgument(
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected ManifestFile createManifestFile(String pathStr) {
path,
getPartitionType(),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString()),
CoreOptions.FILE_FORMAT.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down Expand Up @@ -166,7 +166,7 @@ protected void assertSameContent(

protected List<ManifestFileMeta> createBaseManifestFileMetas(boolean hasPartition) {
List<ManifestFileMeta> input = new ArrayList<>();
// base with 3 partition ,16 entry each parition
// base with 3 partition, 16 entry each partition
for (int j = 0; j < 3; j++) {
List<ManifestEntry> entrys = new ArrayList<>();
for (int i = 0; i < 16; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* A dedicated operator for manual triggered compaction.
*
* <p>In-coming records are generated by sources built from {@link
* org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The records will contain
* org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The records will contain
* partition keys, bucket number, table name and database name.
*/
public class MultiTablesStoreCompactOperator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,53 @@
/** ITCase for deletion vector table. */
public class DeletionVectorITCase extends CatalogITCaseBase {

@ParameterizedTest
@ValueSource(strings = {"input"})
public void testStreamingReadDVTableWhenChangelogProducerIsInput(String changelogProducer)
throws Exception {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s')",
changelogProducer));

sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')");

sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')");

sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')");

// test read from APPEND snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
assertThat(iter.collect(8))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
Row.ofKind(RowKind.INSERT, 2, "2"),
Row.ofKind(RowKind.INSERT, 3, "3"),
Row.ofKind(RowKind.INSERT, 4, "4"),
Row.ofKind(RowKind.INSERT, 2, "2_1"),
Row.ofKind(RowKind.INSERT, 3, "3_1"),
Row.ofKind(RowKind.INSERT, 2, "2_2"),
Row.ofKind(RowKind.INSERT, 4, "4_1"));
}

// test read from COMPACT snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
"SELECT * FROM T /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
assertThat(iter.collect(6))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
Row.ofKind(RowKind.INSERT, 2, "2_1"),
Row.ofKind(RowKind.INSERT, 3, "3_1"),
Row.ofKind(RowKind.INSERT, 4, "4"),
Row.ofKind(RowKind.INSERT, 2, "2_2"),
Row.ofKind(RowKind.INSERT, 4, "4_1"));
}
}

@ParameterizedTest
@ValueSource(strings = {"none", "lookup"})
public void testStreamingReadDVTable(String changelogProducer) throws Exception {
Expand Down
Loading