Skip to content

Commit

Permalink
[core] fix the issue where streaming reading of overwrite data would …
Browse files Browse the repository at this point in the history
…fail when retract type data appeared. (apache#4697)
  • Loading branch information
liming30 authored Dec 13, 2024
1 parent a0680bd commit 00a36e3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ private SplitRead<InternalRow> create(Supplier<MergeFileSplitRead> supplier) {
ConcatRecordReader.create(
() ->
new ReverseReader(
read.createNoMergeReader(
read.createMergeReader(
split.partition(),
split.bucket(),
split.beforeFiles(),
split.beforeDeletionFiles()
.orElse(null),
true)),
false)),
() ->
read.createNoMergeReader(
read.createMergeReader(
split.partition(),
split.bucket(),
split.dataFiles(),
split.deletionFiles().orElse(null),
true));
false));
return unwrap(reader);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,43 @@ public void testStreamingReadOverwriteWithoutPartitionedRecords() throws Excepti
streamingItr.close();
}

@Test
public void testStreamingReadOverwriteWithDeleteRecords() throws Exception {
String table =
createTable(
Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"),
Collections.singletonList("currency"),
Collections.emptyList(),
Collections.emptyList(),
streamingReadOverwrite);

insertInto(
table,
"('US Dollar', 102, '2022-01-01')",
"('Yen', 1, '2022-01-02')",
"('Euro', 119, '2022-01-02')");

bEnv.executeSql(String.format("DELETE FROM %s WHERE currency = 'Euro'", table)).await();

checkFileStorePath(table, Collections.emptyList());

// test projection and filter
BlockingIterator<Row, Row> streamingItr =
testStreamingRead(
buildQuery(table, "currency, rate", "WHERE dt = '2022-01-02'"),
Collections.singletonList(changelogRow("+I", "Yen", 1L)));

insertOverwrite(table, "('US Dollar', 100, '2022-01-02')", "('Yen', 10, '2022-01-01')");

validateStreamingReadResult(
streamingItr,
Arrays.asList(
changelogRow("-D", "Yen", 1L), changelogRow("+I", "US Dollar", 100L)));
assertNoMoreRecords(streamingItr);

streamingItr.close();
}

@Test
public void testUnsupportStreamingReadOverwriteWithoutPk() {
assertThatThrownBy(
Expand Down

0 comments on commit 00a36e3

Please sign in to comment.