diff --git a/.github/workflows/e2e-tests-1.18-jdk11.yml b/.github/workflows/e2e-tests-1.18-jdk11.yml
index a8b42a6df641..b924a3b07628 100644
--- a/.github/workflows/e2e-tests-1.18-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.18-jdk11.yml
@@ -46,13 +46,13 @@ jobs:
distribution: 'adopt'
- name: Build Flink 1.18
run: mvn -T 1C -B clean install -DskipTests
- - name: Test Flink 1.17
+ - name: Test Flink 1.18
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.18
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.18.yml b/.github/workflows/e2e-tests-1.18.yml
index 2985b45c46e9..2f566004a241 100644
--- a/.github/workflows/e2e-tests-1.18.yml
+++ b/.github/workflows/e2e-tests-1.18.yml
@@ -52,6 +52,6 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.18
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.19-jdk11.yml b/.github/workflows/e2e-tests-1.19-jdk11.yml
new file mode 100644
index 000000000000..bc917f453f0f
--- /dev/null
+++ b/.github/workflows/e2e-tests-1.19-jdk11.yml
@@ -0,0 +1,58 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+name: End to End Tests Flink 1.19 on JDK 11
+
+on:
+ issue_comment:
+ types: [created, edited, deleted]
+
+ # daily run
+ schedule:
+ - cron: "0 0 * * *"
+
+env:
+ JDK_VERSION: 11
+
+jobs:
+ build:
+ if: |
+ github.event_name == 'schedule' ||
+ (contains(github.event.comment.html_url, '/pull/') && contains(github.event.comment.body, '/jdk11'))
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
+ - name: Set up JDK ${{ env.JDK_VERSION }}
+ uses: actions/setup-java@v2
+ with:
+ java-version: ${{ env.JDK_VERSION }}
+ distribution: 'adopt'
+ - name: Build Flink 1.19
+ run: mvn -T 1C -B clean install -DskipTests
+ - name: Test Flink 1.19
+ timeout-minutes: 60
+ run: |
+ # run tests with random timezone to find out timezone related bugs
+ . .github/workflows/utils.sh
+ jvm_timezone=$(random_timezone)
+ echo "JVM timezone is set to $jvm_timezone"
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ env:
+ MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.19.yml b/.github/workflows/e2e-tests-1.19.yml
new file mode 100644
index 000000000000..b451d6385a9f
--- /dev/null
+++ b/.github/workflows/e2e-tests-1.19.yml
@@ -0,0 +1,57 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+name: End to End Tests Flink 1.19
+
+on:
+ push:
+ pull_request:
+ paths-ignore:
+ - 'docs/**'
+ - '**/*.md'
+
+env:
+ JDK_VERSION: 8
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
+ cancel-in-progress: true
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
+ - name: Set up JDK ${{ env.JDK_VERSION }}
+ uses: actions/setup-java@v2
+ with:
+ java-version: ${{ env.JDK_VERSION }}
+ distribution: 'adopt'
+ - name: Build Flink 1.19
+ run: mvn -T 1C -B clean install -DskipTests
+ - name: Test Flink 1.19
+ timeout-minutes: 60
+ run: |
+ # run tests with random timezone to find out timezone related bugs
+ . .github/workflows/utils.sh
+ jvm_timezone=$(random_timezone)
+ echo "JVM timezone is set to $jvm_timezone"
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ env:
+ MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/unitcase-flink-jdk11.yml b/.github/workflows/unitcase-flink-jdk11.yml
index 59dd1457ce0f..135dc5718f23 100644
--- a/.github/workflows/unitcase-flink-jdk11.yml
+++ b/.github/workflows/unitcase-flink-jdk11.yml
@@ -52,6 +52,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+ test_modules=""
+ for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
+ test_modules+="org.apache.paimon:paimon-flink-${suffix},"
+ done
+ test_modules="${test_modules%,}"
+ mvn -T 1C -B clean install -pl "${test_modules}" -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink.yml b/.github/workflows/utitcase-flink.yml
index b6cc9cbd128b..c7455e8143b4 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink.yml
@@ -52,7 +52,7 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
- for suffix in 1.15 1.16 1.17 1.18 common; do
+ for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 604eb04c7b8b..fdcdbda7ab28 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -34,7 +34,6 @@ under the License.
2.8.3-10.0
2.3.0
- flink-sql-connector-kafka
flink-sql-connector-hive-2.3.9_${scala.binary.version}
@@ -210,7 +209,7 @@ under the License.
org.apache.flink
- ${flink.sql.connector.kafka}
+ flink-sql-connector-kafka
${test.flink.connector.kafka.version}
flink-sql-connector-kafka.jar
jar
@@ -276,6 +275,14 @@ under the License.
+
+ flink-1.18
+
+ 1.18
+ 1.18.1
+
+
+
flink-1.17
@@ -300,7 +307,6 @@ under the License.
1.15
1.15.3
${test.flink.version}
- flink-sql-connector-kafka
flink-sql-connector-hive-2.3.6_${scala.binary.version}
diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
new file mode 100644
index 000000000000..a51d9e02e743
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.options.MemorySize;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+
+/** Utils for using Flink managed memory. */
+public class ManagedMemoryUtils {
+
+ public static void declareManagedMemory(DataStream> dataStream, MemorySize memorySize) {
+ dataStream
+ .getTransformation()
+ .declareManagedMemoryUseCaseAtOperatorScope(
+ ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
+ }
+
+ public static long computeManagedMemory(AbstractStreamOperator> operator) {
+ final Environment environment = operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index e1c62e28f599..5e9b7c7ceb53 100644
--- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -329,4 +329,24 @@ public void testUnsupportedEventual() {
"SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */"),
"File store continuous reading does not support eventual consistency mode");
}
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
}
diff --git a/paimon-flink/paimon-flink-1.16/pom.xml b/paimon-flink/paimon-flink-1.16/pom.xml
index 9aaa19228ed5..3558309e6a55 100644
--- a/paimon-flink/paimon-flink-1.16/pom.xml
+++ b/paimon-flink/paimon-flink-1.16/pom.xml
@@ -64,7 +64,7 @@ under the License.
org.apache.flink
- flink-table-common
+ flink-table-api-java-bridge
${flink.version}
provided
@@ -93,6 +93,13 @@ under the License.
test
+
+ org.apache.flink
+ flink-connector-files
+ ${flink.version}
+ test
+
+
diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
new file mode 100644
index 000000000000..a51d9e02e743
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.options.MemorySize;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+
+/** Utils for using Flink managed memory. */
+public class ManagedMemoryUtils {
+
+ public static void declareManagedMemory(DataStream> dataStream, MemorySize memorySize) {
+ dataStream
+ .getTransformation()
+ .declareManagedMemoryUseCaseAtOperatorScope(
+ ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
+ }
+
+ public static long computeManagedMemory(AbstractStreamOperator> operator) {
+ final Environment environment = operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000000..84c84d1c68f9
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' = '1')");
+ }
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.17/pom.xml b/paimon-flink/paimon-flink-1.17/pom.xml
index 454386d59217..11318dbab686 100644
--- a/paimon-flink/paimon-flink-1.17/pom.xml
+++ b/paimon-flink/paimon-flink-1.17/pom.xml
@@ -68,6 +68,44 @@ under the License.
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+
+
+
+
+
+ org.apache.paimon
+ paimon-flink-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-connector-files
+ ${flink.version}
+ test
+
diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
new file mode 100644
index 000000000000..a51d9e02e743
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.options.MemorySize;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+
+/** Utils for using Flink managed memory. */
+public class ManagedMemoryUtils {
+
+ public static void declareManagedMemory(DataStream> dataStream, MemorySize memorySize) {
+ dataStream
+ .getTransformation()
+ .declareManagedMemoryUseCaseAtOperatorScope(
+ ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
+ }
+
+ public static long computeManagedMemory(AbstractStreamOperator> operator) {
+ final Environment environment = operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000000..84c84d1c68f9
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' = '1')");
+ }
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml
index a0f83fa0d274..31e85df3b9f0 100644
--- a/paimon-flink/paimon-flink-1.18/pom.xml
+++ b/paimon-flink/paimon-flink-1.18/pom.xml
@@ -42,6 +42,12 @@ under the License.
org.apache.paimon
paimon-flink-common
${project.version}
+
+
+ *
+ *
+
+
@@ -55,6 +61,44 @@ under the License.
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+
+
+
+
+
+ org.apache.paimon
+ paimon-flink-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-connector-files
+ ${flink.version}
+ test
+
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
new file mode 100644
index 000000000000..a51d9e02e743
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.options.MemorySize;
+
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+
+/** Utils for using Flink managed memory. */
+public class ManagedMemoryUtils {
+
+ public static void declareManagedMemory(DataStream> dataStream, MemorySize memorySize) {
+ dataStream
+ .getTransformation()
+ .declareManagedMemoryUseCaseAtOperatorScope(
+ ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
+ }
+
+ public static long computeManagedMemory(AbstractStreamOperator> operator) {
+ final Environment environment = operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000000..84c84d1c68f9
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' = '1')");
+ }
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.19/pom.xml b/paimon-flink/paimon-flink-1.19/pom.xml
new file mode 100644
index 000000000000..60a33df24bf2
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.19/pom.xml
@@ -0,0 +1,85 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.paimon
+ paimon-flink
+ 0.8-SNAPSHOT
+
+
+ jar
+
+ paimon-flink-1.19
+ Paimon : Flink : 1.19
+
+
+ 1.19.0
+
+
+
+
+ org.apache.paimon
+ paimon-flink-common
+ ${project.version}
+
+
+
+ org.apache.paimon
+ paimon-flink-cdc
+ ${project.version}
+
+
+ *
+ *
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-paimon
+ package
+
+ shade
+
+
+
+
+ org.apache.paimon:paimon-flink-common
+ org.apache.paimon:paimon-flink-cdc
+
+
+
+
+
+
+
+
+
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index c0156e841cfa..9cba78f213fd 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -131,8 +131,7 @@ public DataStreamSink> sinkFrom(
createCommitterFactory(),
createCommittableStateManager()))
.setParallelism(input.getParallelism());
- configureGlobalCommitter(
- committed, commitCpuCores, commitHeapMemory, env.getConfiguration());
+ configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml
index efca0b1bde64..eea8b9b581a7 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
Paimon : Flink : Common
- 1.18.1
+ 1.19.0
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 582fcfc35af2..97c426ee5685 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -52,7 +52,6 @@
import java.util.Set;
import java.util.UUID;
-import static org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -261,27 +260,18 @@ protected DataStreamSink> doCommit(DataStream written, String com
.setMaxParallelism(1);
Options options = Options.fromMap(table.options());
configureGlobalCommitter(
- committed,
- options.get(SINK_COMMITTER_CPU),
- options.get(SINK_COMMITTER_MEMORY),
- conf);
+ committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}
public static void configureGlobalCommitter(
SingleOutputStreamOperator> committed,
double cpuCores,
- @Nullable MemorySize heapMemory,
- ReadableConfig conf) {
+ @Nullable MemorySize heapMemory) {
if (heapMemory == null) {
return;
}
- if (!conf.get(ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT)) {
- throw new RuntimeException(
- "To support the 'sink.committer-cpu' and 'sink.committer-memory' configurations, you must enable fine-grained resource management. Please set 'cluster.fine-grained-resource-management.enabled' to 'true' in your Flink configuration.");
- }
-
SlotSharingGroup slotSharingGroup =
SlotSharingGroup.newBuilder(committed.getName())
.setCpuCores(cpuCores)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index a5c8e5557b94..b61fecab5d34 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -23,6 +23,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
@@ -96,7 +97,12 @@ public void open() throws Exception {
this.sinkContext = new SimpleContext(getProcessingTimeService());
if (logSinkFunction != null) {
- FunctionUtils.openFunction(logSinkFunction, new Configuration());
+ // to stay compatible with Flink 1.18-
+ if (logSinkFunction instanceof RichFunction) {
+ RichFunction richFunction = (RichFunction) logSinkFunction;
+ richFunction.open(new Configuration());
+ }
+
logCallback = new LogWriteCallback();
logSinkFunction.setWriteCallback(logCallback);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
index a51d9e02e743..82964e41cc05 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -43,6 +43,7 @@ public static long computeManagedMemory(AbstractStreamOperator> operator) {
operator.getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
environment.getTaskManagerInfo().getConfiguration(),
environment.getUserCodeClassLoader().asClassLoader()));
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index 841a24ba65e1..a559d3350099 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -34,6 +34,7 @@
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.paimon.CoreOptions.BUCKET;
@@ -80,13 +81,11 @@ public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
+ "INSERT INTO `T4` SELECT * FROM `S0`;\n"
+ "END";
- sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, path);
-
// step1: run streaming insert
JobClient jobClient = startJobAndCommitSnapshot(streamSql, null);
// step2: stop with savepoint
- stopJobSafely(jobClient);
+ String savepointPath = stopJobSafely(jobClient);
final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3");
assertThat(snapshotBeforeRescale).isNotNull();
@@ -107,6 +106,9 @@ public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
assertThat(batchSql("SELECT * FROM T3")).containsExactlyInAnyOrderElementsOf(committedData);
// step5: resume streaming job
+ sEnv.getConfig()
+ .getConfiguration()
+ .set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
JobClient resumedJobClient =
startJobAndCommitSnapshot(streamSql, snapshotAfterRescale.id());
// stop job
@@ -144,11 +146,13 @@ private JobClient startJobAndCommitSnapshot(String sql, @Nullable Long initSnaps
return jobClient;
}
- private void stopJobSafely(JobClient client) throws ExecutionException, InterruptedException {
- client.stopWithSavepoint(true, path, SavepointFormatType.DEFAULT);
+ private String stopJobSafely(JobClient client) throws ExecutionException, InterruptedException {
+ CompletableFuture savepointPath =
+ client.stopWithSavepoint(true, path, SavepointFormatType.DEFAULT);
while (!client.getJobStatus().get().isGloballyTerminalState()) {
Thread.sleep(2000L);
}
+ return savepointPath.get();
}
private void assertLatestSchema(
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index 30ef22d8c573..caeb2fcea5c6 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -39,6 +39,7 @@ under the License.
paimon-flink-1.16
paimon-flink-1.17
paimon-flink-1.18
+ paimon-flink-1.19
paimon-flink-action
paimon-flink-cdc
@@ -95,6 +96,10 @@ under the License.
com.google.protobuf
protobuf-java
+
+ commons-io
+ commons-io
+
@@ -162,6 +167,7 @@ under the License.
paimon-flink-1.16
paimon-flink-1.17
paimon-flink-1.18
+ paimon-flink-1.19
paimon-flink-cdc
diff --git a/paimon-hive/paimon-hive-catalog/pom.xml b/paimon-hive/paimon-hive-catalog/pom.xml
index b50e58563750..f4c9a810bd63 100644
--- a/paimon-hive/paimon-hive-catalog/pom.xml
+++ b/paimon-hive/paimon-hive-catalog/pom.xml
@@ -87,31 +87,6 @@ under the License.
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- provided
-
-
- org.apache.avro
- avro
-
-
- log4j
- log4j
-
-
- org.slf4j
- slf4j-log4j12
-
-
- jdk.tools
- jdk.tools
-
-
-
-
org.apache.hadoop
hadoop-hdfs
diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml
index 0f4733f001e3..8f2005e5678b 100644
--- a/paimon-hive/paimon-hive-connector-2.3/pom.xml
+++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml
@@ -127,31 +127,6 @@ under the License.
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- test
-
-
- log4j
- log4j
-
-
- org.slf4j
- slf4j-log4j12
-
-
- org.pentaho
- *
-
-
- com.google.protobuf
- protobuf-java
-
-
-
-
org.apache.hadoop
hadoop-client
@@ -554,6 +529,10 @@ under the License.
org.apache.calcite
calcite-avatica
+
+ commons-io
+ commons-io
+
diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml
index 1792f032f01b..d52bc0b0b028 100644
--- a/paimon-hive/paimon-hive-connector-3.1/pom.xml
+++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml
@@ -141,31 +141,6 @@ under the License.
test
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- test
-
-
- log4j
- log4j
-
-
- org.slf4j
- slf4j-log4j12
-
-
- org.pentaho
- *
-
-
- com.google.protobuf
- protobuf-java
-
-
-
-
org.apache.hadoop
hadoop-client
@@ -580,6 +555,10 @@ under the License.
org.apache.calcite
calcite-avatica
+
+ commons-io
+ commons-io
+
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml
index a9939854ce0e..7d5f1e551492 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -46,35 +46,6 @@ under the License.
${project.version}
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- provided
-
-
- log4j
- log4j
-
-
- org.slf4j
- slf4j-log4j12
-
-
- org.pentaho
- *
-
-
- jdk.tools
- jdk.tools
-
-
- com.google.protobuf
- protobuf-java
-
-
-
-
org.apache.hadoop
hadoop-client
@@ -519,6 +490,12 @@ under the License.
avro
${avro.version}
test
+
+
+ org.apache.commons
+ commons-compress
+
+
**/*Test.*
- 1.18
- 1.18.1
+ 1.19
+ 1.19.0
+
3.0.1-1.18
3.0.11