Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Jan 31, 2024
1 parent d877c44 commit fe6d64b
Showing 1 changed file with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapAgg;
import org.apache.paimon.mergetree.compact.aggregate.FieldNestedUpdateAgg;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.CommonTestUtils;

import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
Expand Down Expand Up @@ -1444,6 +1443,12 @@ private boolean checkNestedTable(Row[] nestedTable, Row... subOrders) {
/** ITCase for {@link FieldCollectAgg}. */
public static class CollectAggregationITCase extends CatalogITCaseBase {

@Override
protected int defaultParallelism() {
// set parallelism to 1 so that the order of input data is determined
return 1;
}

@Test
public void testAggWithDistinct() {
sql(
Expand Down Expand Up @@ -1515,7 +1520,6 @@ public void testAggWithoutDistinct() {

@Test
public void testRetractWithAggregation() throws Exception {
sql("CREATE TABLE INPUT (id INT PRIMARY KEY NOT ENFORCED, f0 ARRAY<STRING>)");
sql(
"CREATE TABLE test_collect("
+ " id INT PRIMARY KEY NOT ENFORCED,"
Expand All @@ -1525,18 +1529,11 @@ public void testRetractWithAggregation() throws Exception {
+ " 'fields.f0.aggregate-function' = 'collect'"
+ ")");

sql(
"INSERT INTO INPUT VALUES "
+ "(1, CAST (NULL AS ARRAY<STRING>)), "
+ "(2, ARRAY['A', 'B'])");
sql("INSERT INTO INPUT VALUES (2, ARRAY['C', 'D'])");

innerTestRetract();
}

@Test
public void testRetractWithPartialUpdate() throws Exception {
sql("CREATE TABLE INPUT (id INT PRIMARY KEY NOT ENFORCED, f0 ARRAY<STRING>, f1 INT)");
sql(
"CREATE TABLE test_collect("
+ " id INT PRIMARY KEY NOT ENFORCED,"
Expand All @@ -1548,31 +1545,36 @@ public void testRetractWithPartialUpdate() throws Exception {
+ " 'fields.f1.sequence-group' = 'f0'"
+ ")");

sql(
"INSERT INTO INPUT VALUES "
+ "(1, CAST (NULL AS ARRAY<STRING>), 10), "
+ "(2, ARRAY['A', 'B'], 10)");
sql("INSERT INTO INPUT VALUES (2, ARRAY['C', 'D'], 20)");

innerTestRetract();
}

private void innerTestRetract() throws Exception {
CloseableIterator<Row> insert =
streamSqlIter(
"INSERT INTO test_collect SELECT * FROM INPUT /*+ OPTIONS('scan.snapshot-id'='1') */");
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, new String[] {"A", "B"}, 10),
Row.ofKind(
RowKind.UPDATE_BEFORE, 1, new String[] {"A", "B"}, 10),
Row.ofKind(
RowKind.UPDATE_AFTER, 1, new String[] {"C", "D"}, 20)));
String temporaryTable =
"CREATE TEMPORARY TABLE INPUT ("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 ARRAY<STRING>,"
+ " f1 INT) WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'data-id' = '%s',\n"
+ " 'bounded' = 'true',\n"
+ " 'changelog-mode' = 'I,UA,UB'\n"
+ ")";
streamSqlIter(temporaryTable, dataId).close();

CommonTestUtils.waitUtil(
() -> sql("SELECT * FROM test_collect").size() == 2,
Duration.ofMinutes(2),
Duration.ofMillis(100));
sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT").await();

List<Row> result = sql("SELECT * FROM test_collect");
checkOneRecord(result.get(0), 1);
assertThat(result.size()).isEqualTo(1);
// if not retracted, the result would be ['A', 'B', 'C', 'D']
checkOneRecord(result.get(1), 2, "C", "D");

insert.close();
checkOneRecord(result.get(0), 1, "C", "D");
}

private void checkOneRecord(Row row, int id, String... elements) {
Expand Down

0 comments on commit fe6d64b

Please sign in to comment.