Skip to content

Commit

Permalink
[ci] Fix paimon flink tests to cover submodule versions (#2949)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Mar 6, 2024
1 parent 4791d66 commit ec5ebeb
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 40 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/utitcase-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
test_modules=""
for suffix in 1.14 1.15 1.16 1.17 1.18 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 1C -B clean install -pl "${test_modules}" -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public class ContinuousFileStoreITCase extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)",
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='input')");
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')",
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ " WITH ('changelog-producer'='input', 'bucket' = '1')");
}

@Test
Expand Down Expand Up @@ -202,12 +203,12 @@ public void testConfigureStartupTimestamp() throws Exception {
@Test
public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
Expand All @@ -221,13 +222,31 @@ public void testConfigureStartupSnapshot() throws Exception {
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();

iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();

iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 2));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), (Row.of("10", "11", "12")));
iterator.close();

// Configure 'scan.snapshot-id' with 'scan.mode=latest'.
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
0))
.hasMessageContaining("Unable to create a source for reading table");
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"scan.snapshot-id must be null when you use latest for scan.mode");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public class ContinuousFileStoreITCase extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)",
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED) WITH('changelog-producer'='input')");
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')",
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ " WITH ('changelog-producer'='input', 'bucket' = '1')");
}

@Test
Expand Down Expand Up @@ -202,12 +203,12 @@ public void testConfigureStartupTimestamp() throws Exception {
@Test
public void testConfigureStartupSnapshot() throws Exception {
// Configure 'scan.snapshot-id' without 'scan.mode'.
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')");
batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')");
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();
Expand All @@ -221,13 +222,31 @@ public void testConfigureStartupSnapshot() throws Exception {
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();

iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 1));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6"));
iterator.close();

iterator =
BlockingIterator.of(
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.snapshot-id'='%s') */", 2));
assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(Row.of("7", "8", "9"), (Row.of("10", "11", "12")));
iterator.close();

// Configure 'scan.snapshot-id' with 'scan.mode=latest'.
assertThatThrownBy(
() ->
streamSqlIter(
"SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.snapshot-id'='%s') */",
0))
.hasMessageContaining("Unable to create a source for reading table");
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"scan.snapshot-id must be null when you use latest for scan.mode");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testSetAndRemoveOption() throws Exception {
}

@Test
public void testSetAndResetImmutableOptions() throws Exception {
public void testSetAndResetImmutableOptions() {
// bucket-key is immutable
sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)");

Expand All @@ -53,7 +53,8 @@ public void testSetAndResetImmutableOptions() throws Exception {
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'bucket-key' is not supported yet.");

sql("CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket-key' = 'c')");
sql(
"CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')");
assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')"))
.getRootCause()
.isInstanceOf(UnsupportedOperationException.class)
Expand Down
29 changes: 2 additions & 27 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,34 +113,9 @@ under the License.
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
31 changes: 31 additions & 0 deletions paimon-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,37 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
Expand Down

0 comments on commit ec5ebeb

Please sign in to comment.