Skip to content

Commit

Permalink
[flink] Bump flink version to 1.19 (apache#3049)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Mar 20, 2024
1 parent 21e016a commit 47bc30f
Show file tree
Hide file tree
Showing 33 changed files with 799 additions and 137 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/e2e-tests-1.18-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ jobs:
distribution: 'adopt'
- name: Build Flink 1.18
run: mvn -T 1C -B clean install -DskipTests
- name: Test Flink 1.17
- name: Test Flink 1.18
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.18
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/e2e-tests-1.18.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone -Pflink-1.18
env:
MAVEN_OPTS: -Xmx4096m
58 changes: 58 additions & 0 deletions .github/workflows/e2e-tests-1.19-jdk11.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

name: End to End Tests Flink 1.19 on JDK 11

on:
issue_comment:
types: [created, edited, deleted]

# daily run
schedule:
- cron: "0 0 * * *"

env:
JDK_VERSION: 11

jobs:
build:
if: |
github.event_name == 'schedule' ||
(contains(github.event.comment.html_url, '/pull/') && contains(github.event.comment.body, '/jdk11'))
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up JDK ${{ env.JDK_VERSION }}
uses: actions/setup-java@v2
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.19
run: mvn -T 1C -B clean install -DskipTests
- name: Test Flink 1.19
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
57 changes: 57 additions & 0 deletions .github/workflows/e2e-tests-1.19.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

name: End to End Tests Flink 1.19

on:
push:
pull_request:
paths-ignore:
- 'docs/**'
- '**/*.md'

env:
JDK_VERSION: 8

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
cancel-in-progress: true

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up JDK ${{ env.JDK_VERSION }}
uses: actions/setup-java@v2
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Build Flink 1.19
run: mvn -T 1C -B clean install -DskipTests
- name: Test Flink 1.19
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
7 changes: 6 additions & 1 deletion .github/workflows/unitcase-flink-jdk11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
mvn -T 1C -B clean install -pl 'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
test_modules=""
for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 1C -B clean install -pl "${test_modules}" -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/utitcase-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
for suffix in 1.15 1.16 1.17 1.18 common; do
for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
Expand Down
12 changes: 9 additions & 3 deletions paimon-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ under the License.
<properties>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
<flink.cdc.version>2.3.0</flink.cdc.version>
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
</properties>

Expand Down Expand Up @@ -210,7 +209,7 @@ under the License.
<!-- test paimon with kafka sql jar -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.sql.connector.kafka}</artifactId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${test.flink.connector.kafka.version}</version>
<destFileName>flink-sql-connector-kafka.jar</destFileName>
<type>jar</type>
Expand Down Expand Up @@ -276,6 +275,14 @@ under the License.

<profiles>
<!-- Activate these profiles with -Pflink-x.xx to build and test against different Flink versions -->
<profile>
<id>flink-1.18</id>
<properties>
<test.flink.main.version>1.18</test.flink.main.version>
<test.flink.version>1.18.1</test.flink.version>
</properties>
</profile>

<profile>
<id>flink-1.17</id>
<properties>
Expand All @@ -300,7 +307,6 @@ under the License.
<test.flink.main.version>1.15</test.flink.main.version>
<test.flink.version>1.15.3</test.flink.version>
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
</properties>
</profile>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.paimon.options.MemorySize;

import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;

/** Utils for using Flink managed memory. */
public class ManagedMemoryUtils {

public static void declareManagedMemory(DataStream<?> dataStream, MemorySize memorySize) {
dataStream
.getTransformation()
.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
}

public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
final Environment environment = operator.getContainingTask().getEnvironment();
return environment
.getMemoryManager()
.computeMemorySize(
operator.getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
environment.getTaskManagerInfo().getConfiguration(),
environment.getUserCodeClassLoader().asClassLoader()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,24 @@ public void testUnsupportedEventual() {
"SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */"),
"File store continuous reading does not support eventual consistency mode");
}

@Test
public void testFlinkMemoryPool() {
// Check if the configuration is effective
assertThatThrownBy(
() ->
batchSql(
"INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='0M') */ "
+ "VALUES ('1', '2', '3'), ('4', '5', '6')",
"T1"))
.hasCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
"Weights for operator scope use cases must be greater than 0.");

batchSql(
"INSERT INTO %s /*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='1M') */ "
+ "VALUES ('1', '2', '3'), ('4', '5', '6')",
"T1");
assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
}
}
9 changes: 8 additions & 1 deletion paimon-flink/paimon-flink-1.16/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -93,6 +93,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.paimon.options.MemorySize;

import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;

/** Utils for using Flink managed memory. */
public class ManagedMemoryUtils {

public static void declareManagedMemory(DataStream<?> dataStream, MemorySize memorySize) {
dataStream
.getTransformation()
.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.OPERATOR, memorySize.getMebiBytes());
}

public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
final Environment environment = operator.getContainingTask().getEnvironment();
return environment
.getMemoryManager()
.computeMemorySize(
operator.getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
environment.getTaskManagerInfo().getConfiguration(),
environment.getUserCodeClassLoader().asClassLoader()));
}
}
Loading

0 comments on commit 47bc30f

Please sign in to comment.