From 47bc30fd13f4e05798f35ff87a0dc83cb3ae67d6 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:36:07 +0800 Subject: [PATCH] [flink] Bump flink version to 1.19 (#3049) --- .github/workflows/e2e-tests-1.18-jdk11.yml | 4 +- .github/workflows/e2e-tests-1.18.yml | 2 +- .github/workflows/e2e-tests-1.19-jdk11.yml | 58 +++++++++++++ .github/workflows/e2e-tests-1.19.yml | 57 +++++++++++++ .github/workflows/unitcase-flink-jdk11.yml | 7 +- .github/workflows/utitcase-flink.yml | 2 +- paimon-e2e-tests/pom.xml | 12 ++- .../flink/utils/ManagedMemoryUtils.java | 49 +++++++++++ .../flink/ContinuousFileStoreITCase.java | 20 +++++ paimon-flink/paimon-flink-1.16/pom.xml | 9 +- .../flink/utils/ManagedMemoryUtils.java | 49 +++++++++++ .../flink/ContinuousFileStoreITCase.java | 59 +++++++++++++ paimon-flink/paimon-flink-1.17/pom.xml | 38 +++++++++ .../flink/utils/ManagedMemoryUtils.java | 49 +++++++++++ .../flink/ContinuousFileStoreITCase.java | 59 +++++++++++++ paimon-flink/paimon-flink-1.18/pom.xml | 44 ++++++++++ .../flink/utils/ManagedMemoryUtils.java | 49 +++++++++++ .../flink/ContinuousFileStoreITCase.java | 59 +++++++++++++ paimon-flink/paimon-flink-1.19/pom.xml | 85 +++++++++++++++++++ .../sink/cdc/FlinkCdcMultiTableSink.java | 3 +- paimon-flink/paimon-flink-common/pom.xml | 2 +- .../apache/paimon/flink/sink/FlinkSink.java | 14 +-- .../flink/sink/RowDataStoreWriteOperator.java | 8 +- .../flink/utils/ManagedMemoryUtils.java | 1 + .../paimon/flink/RescaleBucketITCase.java | 14 +-- paimon-flink/pom.xml | 6 ++ paimon-hive/paimon-hive-catalog/pom.xml | 25 ------ paimon-hive/paimon-hive-connector-2.3/pom.xml | 29 +------ paimon-hive/paimon-hive-connector-3.1/pom.xml | 29 +------ .../paimon-hive-connector-common/pom.xml | 43 +++------- .../paimon/hive/HiveCatalogITCaseBase.java | 4 + paimon-hive/pom.xml | 42 ++++++++- pom.xml | 5 +- 33 files changed, 799 insertions(+), 137 deletions(-) create mode 100644 .github/workflows/e2e-tests-1.19-jdk11.yml create mode 100644 .github/workflows/e2e-tests-1.19.yml create mode 100644 paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java create mode 100644 paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java create mode 100644 paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java create mode 100644 paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java create mode 100644 paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java create mode 100644 paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java create mode 100644 paimon-flink/paimon-flink-1.19/pom.xml diff --git a/.github/workflows/e2e-tests-1.18-jdk11.yml b/.github/workflows/e2e-tests-1.18-jdk11.yml index a8b42a6df641..b924a3b07628 100644 --- a/.github/workflows/e2e-tests-1.18-jdk11.yml +++ b/.github/workflows/e2e-tests-1.18-jdk11.yml @@ -46,13 +46,13 @@ jobs: distribution: 'adopt' - name: Build Flink 1.18 run: mvn -T 1C -B clean install -DskipTests - - name: Test Flink 1.17 + - name: Test Flink 1.18 timeout-minutes: 60 run: | # run tests with random timezone to find out timezone related bugs . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone + mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.18 env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/e2e-tests-1.18.yml b/.github/workflows/e2e-tests-1.18.yml index 2985b45c46e9..2f566004a241 100644 --- a/.github/workflows/e2e-tests-1.18.yml +++ b/.github/workflows/e2e-tests-1.18.yml @@ -52,6 +52,6 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone + mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.18 env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/e2e-tests-1.19-jdk11.yml b/.github/workflows/e2e-tests-1.19-jdk11.yml new file mode 100644 index 000000000000..bc917f453f0f --- /dev/null +++ b/.github/workflows/e2e-tests-1.19-jdk11.yml @@ -0,0 +1,58 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: End to End Tests Flink 1.19 on JDK 11 + +on: + issue_comment: + types: [created, edited, deleted] + + # daily run + schedule: + - cron: "0 0 * * *" + +env: + JDK_VERSION: 11 + +jobs: + build: + if: | + github.event_name == 'schedule' || + (contains(github.event.comment.html_url, '/pull/') && contains(github.event.comment.body, '/jdk11')) + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v2 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'adopt' + - name: Build Flink 1.19 + run: mvn -T 1C -B clean install -DskipTests + - name: Test Flink 1.19 + timeout-minutes: 60 + run: | + # run tests with random timezone to find out timezone related bugs + . .github/workflows/utils.sh + jvm_timezone=$(random_timezone) + echo "JVM timezone is set to $jvm_timezone" + mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone + env: + MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/e2e-tests-1.19.yml b/.github/workflows/e2e-tests-1.19.yml new file mode 100644 index 000000000000..b451d6385a9f --- /dev/null +++ b/.github/workflows/e2e-tests-1.19.yml @@ -0,0 +1,57 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: End to End Tests Flink 1.19 + +on: + push: + pull_request: + paths-ignore: + - 'docs/**' + - '**/*.md' + +env: + JDK_VERSION: 8 + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v2 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'adopt' + - name: Build Flink 1.19 + run: mvn -T 1C -B clean install -DskipTests + - name: Test Flink 1.19 + timeout-minutes: 60 + run: | + # run tests with random timezone to find out timezone related bugs + . .github/workflows/utils.sh + jvm_timezone=$(random_timezone) + echo "JVM timezone is set to $jvm_timezone" + mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone + env: + MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/unitcase-flink-jdk11.yml b/.github/workflows/unitcase-flink-jdk11.yml index 59dd1457ce0f..135dc5718f23 100644 --- a/.github/workflows/unitcase-flink-jdk11.yml +++ b/.github/workflows/unitcase-flink-jdk11.yml @@ -52,6 +52,11 @@ jobs: . .github/workflows/utils.sh jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" - mvn -T 1C -B clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone + test_modules="" + for suffix in 1.15 1.16 1.17 1.18 1.19 common; do + test_modules+="org.apache.paimon:paimon-flink-${suffix}," + done + test_modules="${test_modules%,}" + mvn -T 1C -B clean install -pl "${test_modules}" -Duser.timezone=$jvm_timezone env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase-flink.yml b/.github/workflows/utitcase-flink.yml index b6cc9cbd128b..c7455e8143b4 100644 --- a/.github/workflows/utitcase-flink.yml +++ b/.github/workflows/utitcase-flink.yml @@ -52,7 +52,7 @@ jobs: jvm_timezone=$(random_timezone) echo "JVM timezone is set to $jvm_timezone" test_modules="" - for suffix in 1.15 1.16 1.17 1.18 common; do + for suffix in 1.15 1.16 1.17 1.18 1.19 common; do test_modules+="org.apache.paimon:paimon-flink-${suffix}," done test_modules="${test_modules%,}" diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml index 604eb04c7b8b..fdcdbda7ab28 100644 --- a/paimon-e2e-tests/pom.xml +++ b/paimon-e2e-tests/pom.xml @@ -34,7 +34,6 @@ under the License. 2.8.3-10.0 2.3.0 - flink-sql-connector-kafka flink-sql-connector-hive-2.3.9_${scala.binary.version} @@ -210,7 +209,7 @@ under the License. org.apache.flink - ${flink.sql.connector.kafka} + flink-sql-connector-kafka ${test.flink.connector.kafka.version} flink-sql-connector-kafka.jar jar @@ -276,6 +275,14 @@ under the License. + + flink-1.18 + + 1.18 + 1.18.1 + + + flink-1.17 @@ -300,7 +307,6 @@ under the License. 1.15 1.15.3 ${test.flink.version} - flink-sql-connector-kafka flink-sql-connector-hive-2.3.6_${scala.binary.version} diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java new file mode 100644 index 000000000000..a51d9e02e743 --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.paimon.options.MemorySize; + +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; + +/** Utils for using Flink managed memory. */ +public class ManagedMemoryUtils { + + public static void declareManagedMemory(DataStream dataStream, MemorySize memorySize) { + dataStream + .getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes()); + } + + public static long computeManagedMemory(AbstractStreamOperator operator) { + final Environment environment = operator.getContainingTask().getEnvironment(); + return environment + .getMemoryManager() + .computeMemorySize( + operator.getOperatorConfig() + .getManagedMemoryFractionOperatorUseCaseOfSlot( + ManagedMemoryUseCase.OPERATOR, + environment.getTaskManagerInfo().getConfiguration(), + environment.getUserCodeClassLoader().asClassLoader())); + } +} diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index e1c62e28f599..5e9b7c7ceb53 100644 --- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -329,4 +329,24 @@ public void testUnsupportedEventual() { "SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */"), "File store continuous reading does not support eventual consistency mode"); } + + @Test + public void testFlinkMemoryPool() { + // Check if the configuration is effective + assertThatThrownBy( + () -> + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1")) + .hasCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "Weights for operator scope use cases must be greater than 0."); + + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1"); + assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2); + } } diff --git a/paimon-flink/paimon-flink-1.16/pom.xml b/paimon-flink/paimon-flink-1.16/pom.xml index 9aaa19228ed5..3558309e6a55 100644 --- a/paimon-flink/paimon-flink-1.16/pom.xml +++ b/paimon-flink/paimon-flink-1.16/pom.xml @@ -64,7 +64,7 @@ under the License. org.apache.flink - flink-table-common + flink-table-api-java-bridge ${flink.version} provided @@ -93,6 +93,13 @@ under the License. test + + org.apache.flink + flink-connector-files + ${flink.version} + test + + diff --git a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java new file mode 100644 index 000000000000..a51d9e02e743 --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.paimon.options.MemorySize; + +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; + +/** Utils for using Flink managed memory. */ +public class ManagedMemoryUtils { + + public static void declareManagedMemory(DataStream dataStream, MemorySize memorySize) { + dataStream + .getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes()); + } + + public static long computeManagedMemory(AbstractStreamOperator operator) { + final Environment environment = operator.getContainingTask().getEnvironment(); + return environment + .getMemoryManager() + .computeMemorySize( + operator.getOperatorConfig() + .getManagedMemoryFractionOperatorUseCaseOfSlot( + ManagedMemoryUseCase.OPERATOR, + environment.getTaskManagerInfo().getConfiguration(), + environment.getUserCodeClassLoader().asClassLoader())); + } +} diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java new file mode 100644 index 000000000000..84c84d1c68f9 --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** SQL ITCase for continuous file store. */ +public class ContinuousFileStoreITCase extends CatalogITCaseBase { + + @Override + protected List ddl() { + return Arrays.asList( + "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')", + "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + + " WITH ('changelog-producer'='input', 'bucket' = '1')"); + } + + @Test + public void testFlinkMemoryPool() { + // Check if the configuration is effective + assertThatThrownBy( + () -> + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1")) + .hasCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "Weights for operator scope use cases must be greater than 0."); + + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1"); + assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2); + } +} diff --git a/paimon-flink/paimon-flink-1.17/pom.xml b/paimon-flink/paimon-flink-1.17/pom.xml index 454386d59217..11318dbab686 100644 --- a/paimon-flink/paimon-flink-1.17/pom.xml +++ b/paimon-flink/paimon-flink-1.17/pom.xml @@ -68,6 +68,44 @@ under the License. + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + + + org.apache.paimon + paimon-flink-common + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-connector-files + ${flink.version} + test + diff --git a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java new file mode 100644 index 000000000000..a51d9e02e743 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.paimon.options.MemorySize; + +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; + +/** Utils for using Flink managed memory. */ +public class ManagedMemoryUtils { + + public static void declareManagedMemory(DataStream dataStream, MemorySize memorySize) { + dataStream + .getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes()); + } + + public static long computeManagedMemory(AbstractStreamOperator operator) { + final Environment environment = operator.getContainingTask().getEnvironment(); + return environment + .getMemoryManager() + .computeMemorySize( + operator.getOperatorConfig() + .getManagedMemoryFractionOperatorUseCaseOfSlot( + ManagedMemoryUseCase.OPERATOR, + environment.getTaskManagerInfo().getConfiguration(), + environment.getUserCodeClassLoader().asClassLoader())); + } +} diff --git a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java new file mode 100644 index 000000000000..84c84d1c68f9 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** SQL ITCase for continuous file store. */ +public class ContinuousFileStoreITCase extends CatalogITCaseBase { + + @Override + protected List ddl() { + return Arrays.asList( + "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')", + "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + + " WITH ('changelog-producer'='input', 'bucket' = '1')"); + } + + @Test + public void testFlinkMemoryPool() { + // Check if the configuration is effective + assertThatThrownBy( + () -> + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1")) + .hasCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "Weights for operator scope use cases must be greater than 0."); + + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1"); + assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2); + } +} diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml index a0f83fa0d274..31e85df3b9f0 100644 --- a/paimon-flink/paimon-flink-1.18/pom.xml +++ b/paimon-flink/paimon-flink-1.18/pom.xml @@ -42,6 +42,12 @@ under the License. org.apache.paimon paimon-flink-common ${project.version} + + + * + * + + @@ -55,6 +61,44 @@ under the License. + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + + + org.apache.paimon + paimon-flink-common + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-connector-files + ${flink.version} + test + diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java new file mode 100644 index 000000000000..a51d9e02e743 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.paimon.options.MemorySize; + +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; + +/** Utils for using Flink managed memory. */ +public class ManagedMemoryUtils { + + public static void declareManagedMemory(DataStream dataStream, MemorySize memorySize) { + dataStream + .getTransformation() + .declareManagedMemoryUseCaseAtOperatorScope( + ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes()); + } + + public static long computeManagedMemory(AbstractStreamOperator operator) { + final Environment environment = operator.getContainingTask().getEnvironment(); + return environment + .getMemoryManager() + .computeMemorySize( + operator.getOperatorConfig() + .getManagedMemoryFractionOperatorUseCaseOfSlot( + ManagedMemoryUseCase.OPERATOR, + environment.getTaskManagerInfo().getConfiguration(), + environment.getUserCodeClassLoader().asClassLoader())); + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java new file mode 100644 index 000000000000..84c84d1c68f9 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** SQL ITCase for continuous file store. */ +public class ContinuousFileStoreITCase extends CatalogITCaseBase { + + @Override + protected List ddl() { + return Arrays.asList( + "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1')", + "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)" + + " WITH ('changelog-producer'='input', 'bucket' = '1')"); + } + + @Test + public void testFlinkMemoryPool() { + // Check if the configuration is effective + assertThatThrownBy( + () -> + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1")) + .hasCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage( + "Weights for operator scope use cases must be greater than 0."); + + batchSql( + "INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ " + + "VALUES ('1', '2', '3'), ('4', '5', '6')", + "T1"); + assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2); + } +} diff --git a/paimon-flink/paimon-flink-1.19/pom.xml b/paimon-flink/paimon-flink-1.19/pom.xml new file mode 100644 index 000000000000..60a33df24bf2 --- /dev/null +++ b/paimon-flink/paimon-flink-1.19/pom.xml @@ -0,0 +1,85 @@ + + + + 4.0.0 + + + org.apache.paimon + paimon-flink + 0.8-SNAPSHOT + + + jar + + paimon-flink-1.19 + Paimon : Flink : 1.19 + + + 1.19.0 + + + + + org.apache.paimon + paimon-flink-common + ${project.version} + + + + org.apache.paimon + paimon-flink-cdc + ${project.version} + + + * + * + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-paimon + package + + shade + + + + + org.apache.paimon:paimon-flink-common + org.apache.paimon:paimon-flink-cdc + + + + + + + + + diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index c0156e841cfa..9cba78f213fd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -131,8 +131,7 @@ public DataStreamSink sinkFrom( createCommitterFactory(), createCommittableStateManager())) .setParallelism(input.getParallelism()); - configureGlobalCommitter( - committed, commitCpuCores, commitHeapMemory, env.getConfiguration()); + configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory); return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index efca0b1bde64..eea8b9b581a7 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -34,7 +34,7 @@ under the License. Paimon : Flink : Common - 1.18.1 + 1.19.0 diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 582fcfc35af2..97c426ee5685 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -52,7 +52,6 @@ import java.util.Set; import java.util.UUID; -import static org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT; @@ -261,27 +260,18 @@ protected DataStreamSink doCommit(DataStream written, String com .setMaxParallelism(1); Options options = Options.fromMap(table.options()); configureGlobalCommitter( - committed, - options.get(SINK_COMMITTER_CPU), - options.get(SINK_COMMITTER_MEMORY), - conf); + committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } public static void configureGlobalCommitter( SingleOutputStreamOperator committed, double cpuCores, - @Nullable MemorySize heapMemory, - ReadableConfig conf) { + @Nullable MemorySize heapMemory) { if (heapMemory == null) { return; } - if (!conf.get(ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT)) { - throw new RuntimeException( - "To support the 'sink.committer-cpu' and 'sink.committer-memory' configurations, you must enable fine-grained resource management. Please set 'cluster.fine-grained-resource-management.enabled' to 'true' in your Flink configuration."); - } - SlotSharingGroup slotSharingGroup = SlotSharingGroup.newBuilder(committed.getName()) .setCpuCores(cpuCores) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java index a5c8e5557b94..b61fecab5d34 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java @@ -23,6 +23,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.SinkRecord; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; @@ -96,7 +97,12 @@ public void open() throws Exception { this.sinkContext = new SimpleContext(getProcessingTimeService()); if (logSinkFunction != null) { - FunctionUtils.openFunction(logSinkFunction, new Configuration()); + // to stay compatible with Flink 1.18- + if (logSinkFunction instanceof RichFunction) { + RichFunction richFunction = (RichFunction) logSinkFunction; + richFunction.open(new Configuration()); + } + logCallback = new LogWriteCallback(); logSinkFunction.setWriteCallback(logCallback); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java index a51d9e02e743..82964e41cc05 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java @@ -43,6 +43,7 @@ public static long computeManagedMemory(AbstractStreamOperator operator) { operator.getOperatorConfig() .getManagedMemoryFractionOperatorUseCaseOfSlot( ManagedMemoryUseCase.OPERATOR, + environment.getJobConfiguration(), environment.getTaskManagerInfo().getConfiguration(), environment.getUserCodeClassLoader().asClassLoader())); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java index 841a24ba65e1..a559d3350099 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java @@ -34,6 +34,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.apache.paimon.CoreOptions.BUCKET; @@ -80,13 +81,11 @@ public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception { + "INSERT INTO `T4` SELECT * FROM `S0`;\n" + "END"; - sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, path); - // step1: run streaming insert JobClient jobClient = startJobAndCommitSnapshot(streamSql, null); // step2: stop with savepoint - stopJobSafely(jobClient); + String savepointPath = stopJobSafely(jobClient); final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3"); assertThat(snapshotBeforeRescale).isNotNull(); @@ -107,6 +106,9 @@ public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception { assertThat(batchSql("SELECT * FROM T3")).containsExactlyInAnyOrderElementsOf(committedData); // step5: resume streaming job + sEnv.getConfig() + .getConfiguration() + .set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); JobClient resumedJobClient = startJobAndCommitSnapshot(streamSql, snapshotAfterRescale.id()); // stop job @@ -144,11 +146,13 @@ private JobClient startJobAndCommitSnapshot(String sql, @Nullable Long initSnaps return jobClient; } - private void stopJobSafely(JobClient client) throws ExecutionException, InterruptedException { - client.stopWithSavepoint(true, path, SavepointFormatType.DEFAULT); + private String stopJobSafely(JobClient client) throws ExecutionException, InterruptedException { + CompletableFuture savepointPath = + client.stopWithSavepoint(true, path, SavepointFormatType.DEFAULT); while (!client.getJobStatus().get().isGloballyTerminalState()) { Thread.sleep(2000L); } + return savepointPath.get(); } private void assertLatestSchema( diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml index 30ef22d8c573..caeb2fcea5c6 100644 --- a/paimon-flink/pom.xml +++ b/paimon-flink/pom.xml @@ -39,6 +39,7 @@ under the License. paimon-flink-1.16 paimon-flink-1.17 paimon-flink-1.18 + paimon-flink-1.19 paimon-flink-action paimon-flink-cdc @@ -95,6 +96,10 @@ under the License. com.google.protobuf protobuf-java + + commons-io + commons-io + @@ -162,6 +167,7 @@ under the License. paimon-flink-1.16 paimon-flink-1.17 paimon-flink-1.18 + paimon-flink-1.19 paimon-flink-cdc diff --git a/paimon-hive/paimon-hive-catalog/pom.xml b/paimon-hive/paimon-hive-catalog/pom.xml index b50e58563750..f4c9a810bd63 100644 --- a/paimon-hive/paimon-hive-catalog/pom.xml +++ b/paimon-hive/paimon-hive-catalog/pom.xml @@ -87,31 +87,6 @@ under the License. - - org.apache.hadoop - hadoop-common - ${hadoop.version} - provided - - - org.apache.avro - avro - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - jdk.tools - jdk.tools - - - - org.apache.hadoop hadoop-hdfs diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml index 0f4733f001e3..8f2005e5678b 100644 --- a/paimon-hive/paimon-hive-connector-2.3/pom.xml +++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml @@ -127,31 +127,6 @@ under the License. - - org.apache.hadoop - hadoop-common - ${hadoop.version} - test - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - org.pentaho - * - - - com.google.protobuf - protobuf-java - - - - org.apache.hadoop hadoop-client @@ -554,6 +529,10 @@ under the License. org.apache.calcite calcite-avatica + + commons-io + commons-io + diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml index 1792f032f01b..d52bc0b0b028 100644 --- a/paimon-hive/paimon-hive-connector-3.1/pom.xml +++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml @@ -141,31 +141,6 @@ under the License. test - - org.apache.hadoop - hadoop-common - ${hadoop.version} - test - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - org.pentaho - * - - - com.google.protobuf - protobuf-java - - - - org.apache.hadoop hadoop-client @@ -580,6 +555,10 @@ under the License. org.apache.calcite calcite-avatica + + commons-io + commons-io + diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index a9939854ce0e..7d5f1e551492 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -46,35 +46,6 @@ under the License. ${project.version} - - org.apache.hadoop - hadoop-common - ${hadoop.version} - provided - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - org.pentaho - * - - - jdk.tools - jdk.tools - - - com.google.protobuf - protobuf-java - - - - org.apache.hadoop hadoop-client @@ -519,6 +490,12 @@ under the License. avro ${avro.version} test + + + org.apache.commons + commons-compress + + **/*Test.* - 1.18 - 1.18.1 + 1.19 + 1.19.0 + 3.0.1-1.18 3.0.11