Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Jan 31, 2024
1 parent fe6d64b commit dfd22ef
Showing 1 changed file with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ public void testRetractWithAggregation() throws Exception {
+ " 'fields.f0.aggregate-function' = 'collect'"
+ ")");

innerTestRetract();
innerTestRetract(false);
}

@Test
Expand All @@ -1545,29 +1545,40 @@ public void testRetractWithPartialUpdate() throws Exception {
+ " 'fields.f1.sequence-group' = 'f0'"
+ ")");

innerTestRetract();
innerTestRetract(true);
}

private void innerTestRetract() throws Exception {
String dataId =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, new String[] {"A", "B"}, 10),
Row.ofKind(
RowKind.UPDATE_BEFORE, 1, new String[] {"A", "B"}, 10),
Row.ofKind(
RowKind.UPDATE_AFTER, 1, new String[] {"C", "D"}, 20)));
private void innerTestRetract(boolean partialUpdate) throws Exception {
String temporaryTable =
"CREATE TEMPORARY TABLE INPUT ("
+ " id INT PRIMARY KEY NOT ENFORCED,"
+ " f0 ARRAY<STRING>,"
+ " f1 INT) WITH (\n"
+ " f0 ARRAY<STRING>"
+ " %s) WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'data-id' = '%s',\n"
+ " 'bounded' = 'true',\n"
+ " 'changelog-mode' = 'I,UA,UB'\n"
+ ")";
streamSqlIter(temporaryTable, dataId).close();

String f1;
List<Row> inputRecords;
if (partialUpdate) {
f1 = ", f1 INT";
inputRecords =
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, new String[] {"A", "B"}, 10),
Row.ofKind(RowKind.UPDATE_BEFORE, 1, new String[] {"A", "B"}, 10),
Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] {"C", "D"}, 20));
} else {
f1 = "";
inputRecords =
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, new String[] {"A", "B"}),
Row.ofKind(RowKind.UPDATE_BEFORE, 1, new String[] {"A", "B"}),
Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] {"C", "D"}));
}
streamSqlIter(temporaryTable, f1, TestValuesTableFactory.registerData(inputRecords))
.close();

sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT").await();

Expand Down

0 comments on commit dfd22ef

Please sign in to comment.