Skip to content

Commit

Permalink
[flink] Support compact procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Sep 14, 2023
1 parent 18529b8 commit c511ee1
Show file tree
Hide file tree
Showing 29 changed files with 593 additions and 174 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.14-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests -Pflink-1.14
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.14
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone -Pflink-1.14
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.14.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests -Pflink-1.14
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.14
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone -Pflink-1.14
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.15-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests -Pflink-1.15
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.15
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone -Pflink-1.15
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.15.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests -Pflink-1.15
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.15
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone -Pflink-1.15
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.16-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.16.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests -Pflink-1.16
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.16
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone -Pflink-1.16
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.17-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.17.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
mvn -T 1C -B test -pl paimon-e2e-tests '!FlinkProceduresE2eTest' -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
56 changes: 56 additions & 0 deletions .github/workflows/procedures-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

name: End to End Procedures Tests (Flink 1.18)

on:
push:
pull_request:
paths-ignore:
- 'docs/**'
- '**/*.md'

env:
JDK_VERSION: 8

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
cancel-in-progress: true

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up JDK ${{ env.JDK_VERSION }}
uses: actions/setup-java@v2
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build and Test Flink 1.18 Procedures
timeout-minutes: 10
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -DskipTests -Pflink-1.18
mvn -T 1C -B test -pl FlinkProceduresE2eTest -Duser.timezone=$jvm_timezone -Pflink-1.18
env:
MAVEN_OPTS: -Xmx4096m
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ Map<String, Map<String, Path>> allTablePaths() {
}
}

protected abstract String warehouse();
@Override
public Map<String, String> options() {
return catalogOptions;
}

protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -212,6 +213,12 @@ default boolean caseSensitive() {
return true;
}

/** Return the warehouse path. */
String warehouse();

/** Return the catalog options. */
Map<String, String> options();

/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private static String database(Path path) {
public void close() throws Exception {}

@Override
protected String warehouse() {
public String warehouse() {
return warehouse.toString();
}
}
8 changes: 8 additions & 0 deletions paimon-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ under the License.

<profiles>
<!-- Activate these profiles with -Pflink-x.xx to build and test against different Flink versions -->
<profile>
<id>flink-1.18</id>
<properties>
<test.flink.main.version>1.18</test.flink.main.version>
<test.flink.version>1.18-SNAPSHOT</test.flink.version>
</properties>
</profile>

<profile>
<id>flink-1.16</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.tests;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import java.util.UUID;

/** Tests for flink {@code Procedure}s. */
@EnabledIfSystemProperty(named = "test.flink.version", matches = "1.18.*")
public class FlinkProceduresE2eTest extends E2eTestBase {

public FlinkProceduresE2eTest() {
super(true, true);
}

@Test
public void testCompact() throws Exception {
String topicName = "ts-topic-" + UUID.randomUUID();
createKafkaTopic(topicName, 1);
// prepare first part of test data
sendKafkaMessage("1.csv", "20221205,1,100\n20221206,1,100\n20221207,1,100", topicName);

// create hive catalog to test catalog loading
String warehouse = HDFS_ROOT + "/" + UUID.randomUUID() + ".warehouse";
String catalogDdl =
String.format(
"CREATE CATALOG ts_catalog WITH (\n"
+ " 'type' = 'paimon',\n"
+ " 'warehouse' = '%s',\n"
+ " 'metastore' = 'hive',\n"
+ " 'uri' = 'thrift://hive-metastore:9083'\n"
+ ");",
warehouse);
String useCatalogCmd = "USE CATALOG ts_catalog;";

String testDataSourceDdl =
String.format(
"CREATE TEMPORARY TABLE test_source (\n"
+ " dt STRING,\n"
+ " k INT,\n"
+ " v INT"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'properties.bootstrap.servers' = 'kafka:9092',\n"
+ " 'properties.group.id' = 'testGroup',\n"
+ " 'scan.startup.mode' = 'earliest-offset',\n"
+ " 'topic' = '%s',\n"
+ " 'format' = 'csv'\n"
+ ");",
topicName);

String tableDdl =
"CREATE TABLE IF NOT EXISTS ts_table (\n"
+ " dt STRING,\n"
+ " k INT,\n"
+ " v INT,\n"
+ " PRIMARY KEY (dt, k) NOT ENFORCED\n"
+ ") PARTITIONED BY (dt) WITH (\n"
+ " 'changelog-producer' = 'full-compaction',\n"
+ " 'changelog-producer.compaction-interval' = '1s',\n"
+ " 'write-only' = 'true'\n"
+ ");";

// insert data into paimon
runSql(
"SET 'execution.checkpointing.interval' = '1s';\n"
+ "INSERT INTO ts_table SELECT * FROM test_source;",
catalogDdl,
useCatalogCmd,
tableDdl,
testDataSourceDdl);

// execute compact procedure
runSql(
"SET 'execution.checkpointing.interval' = '1s';\n"
+ "CALL compact('default.ts_table', 'dt=20221205', 'dt=20221206');",
catalogDdl,
useCatalogCmd);

// read all data from paimon
runSql(
"INSERT INTO result1 SELECT * FROM ts_table;",
catalogDdl,
useCatalogCmd,
tableDdl,
createResultSink("result1", "dt STRING, k INT, v INT"));

// check that first part of test data are compacted
checkResult("20221205, 1, 100", "20221206, 1, 100");

// prepare second part of test data
sendKafkaMessage("2.csv", "20221205,1,101\n20221206,1,101\n20221207,1,101", topicName);

// check that second part of test data are compacted
checkResult("20221205, 1, 101", "20221206, 1, 101");
}

private void runSql(String sql, String... ddls) throws Exception {
runSql(String.join("\n", ddls) + "\n" + sql);
}
}
85 changes: 85 additions & 0 deletions paimon-flink/paimon-flink-1.18/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink</artifactId>
<version>0.6-SNAPSHOT</version>
</parent>

<packaging>jar</packaging>

<artifactId>paimon-flink-1.18</artifactId>
<name>Paimon : Flink : 1.18</name>

<properties>
<flink.version>1.18-SNAPSHOT</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-cdc</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-paimon</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes combine.children="append">
<include>org.apache.paimon:paimon-flink-common</include>
<include>org.apache.paimon:paimon-flink-cdc</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit c511ee1

Please sign in to comment.